Skip to content

Commit 3ba5aaa

Browse files
chenghao-intelmarmbrus
authored andcommitted
[SPARK-5213] [SQL] Pluggable SQL Parser Support
This PR aims to make the SQL Parser Pluggable, and user can register it's own parser via Spark SQL CLI. ``` # add the jar into the classpath $hchengmydesktop:spark>bin/spark-sql --jars sql99.jar -- switch to "hiveql" dialect spark-sql>SET spark.sql.dialect=hiveql; spark-sql>SELECT * FROM src LIMIT 1; -- switch to "sql" dialect spark-sql>SET spark.sql.dialect=sql; spark-sql>SELECT * FROM src LIMIT 1; -- switch to a custom dialect spark-sql>SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect; spark-sql>SELECT * FROM src LIMIT 1; -- register the non-exist SQL dialect spark-sql> SET spark.sql.dialect=NotExistedClass; spark-sql> SELECT * FROM src LIMIT 1; -- Exception will be thrown and switch to default sql dialect ("sql" for SQLContext and "hiveql" for HiveContext) ``` Author: Cheng Hao <[email protected]> Closes apache#4015 from chenghao-intel/sqlparser and squashes the following commits: 493775c [Cheng Hao] update the code as feedback 81a731f [Cheng Hao] remove the unecessary comment aab0b0b [Cheng Hao] polish the code a little bit 49b9d81 [Cheng Hao] shrink the comment for rebasing
1 parent e991255 commit 3ba5aaa

File tree

9 files changed

+199
-42
lines changed

9 files changed

+199
-42
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,6 @@ import scala.util.parsing.input.CharArrayReader.EofCh
2525

2626
import org.apache.spark.sql.catalyst.plans.logical._
2727

28-
private[sql] object KeywordNormalizer {
29-
def apply(str: String): String = str.toLowerCase()
30-
}
31-
3228
private[sql] abstract class AbstractSparkSQLParser
3329
extends StandardTokenParsers with PackratParsers {
3430

@@ -42,7 +38,7 @@ private[sql] abstract class AbstractSparkSQLParser
4238
}
4339

4440
protected case class Keyword(str: String) {
45-
def normalize: String = KeywordNormalizer(str)
41+
def normalize: String = lexical.normalizeKeyword(str)
4642
def parser: Parser[String] = normalize
4743
}
4844

@@ -90,13 +86,16 @@ class SqlLexical extends StdLexical {
9086
reserved ++= keywords
9187
}
9288

89+
/* Normal the keyword string */
90+
def normalizeKeyword(str: String): String = str.toLowerCase
91+
9392
delimiters += (
9493
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
9594
",", ";", "%", "{", "}", ":", "[", "]", ".", "&", "|", "^", "~", "<=>"
9695
)
9796

9897
protected override def processIdent(name: String) = {
99-
val token = KeywordNormalizer(name)
98+
val token = normalizeKeyword(name)
10099
if (reserved contains token) Keyword(token) else Identifier(name)
101100
}
102101

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22+
23+
/**
24+
* Root class of SQL Parser Dialect, and we don't guarantee the binary
25+
* compatibility for the future release, let's keep it as the internal
26+
* interface for advanced user.
27+
*
28+
*/
29+
@DeveloperApi
30+
abstract class Dialect {
31+
// this is the main function that will be implemented by sql parser.
32+
def parse(sqlText: String): LogicalPlan
33+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ package object errors {
3838
}
3939
}
4040

41+
class DialectException(msg: String, cause: Throwable) extends Exception(msg, cause)
42+
4143
/**
4244
* Wraps any exceptions that are thrown while executing `f` in a
4345
* [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`.

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 68 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.JavaConversions._
2424
import scala.collection.immutable
2525
import scala.language.implicitConversions
2626
import scala.reflect.runtime.universe.TypeTag
27+
import scala.util.control.NonFatal
2728

2829
import com.google.common.reflect.TypeToken
2930

@@ -32,9 +33,11 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
3233
import org.apache.spark.rdd.RDD
3334
import org.apache.spark.sql.catalyst.analysis._
3435
import org.apache.spark.sql.catalyst.expressions._
36+
import org.apache.spark.sql.catalyst.errors.DialectException
3537
import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
3638
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
3739
import org.apache.spark.sql.catalyst.rules.RuleExecutor
40+
import org.apache.spark.sql.catalyst.Dialect
3841
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, expressions}
3942
import org.apache.spark.sql.execution.{Filter, _}
4043
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
@@ -44,6 +47,45 @@ import org.apache.spark.sql.types._
4447
import org.apache.spark.util.Utils
4548
import org.apache.spark.{Partition, SparkContext}
4649

50+
/**
51+
* Currently we support the default dialect named "sql", associated with the class
52+
* [[DefaultDialect]]
53+
*
54+
* And we can also provide custom SQL Dialect, for example in Spark SQL CLI:
55+
* {{{
56+
*-- switch to "hiveql" dialect
57+
* spark-sql>SET spark.sql.dialect=hiveql;
58+
* spark-sql>SELECT * FROM src LIMIT 1;
59+
*
60+
*-- switch to "sql" dialect
61+
* spark-sql>SET spark.sql.dialect=sql;
62+
* spark-sql>SELECT * FROM src LIMIT 1;
63+
*
64+
*-- register the new SQL dialect
65+
* spark-sql> SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect;
66+
* spark-sql> SELECT * FROM src LIMIT 1;
67+
*
68+
*-- register the non-exist SQL dialect
69+
* spark-sql> SET spark.sql.dialect=NotExistedClass;
70+
* spark-sql> SELECT * FROM src LIMIT 1;
71+
*
72+
*-- Exception will be thrown and switch to dialect
73+
*-- "sql" (for SQLContext) or
74+
*-- "hiveql" (for HiveContext)
75+
* }}}
76+
*/
77+
private[spark] class DefaultDialect extends Dialect {
78+
@transient
79+
protected val sqlParser = {
80+
val catalystSqlParser = new catalyst.SqlParser
81+
new SparkSQLParser(catalystSqlParser.parse)
82+
}
83+
84+
override def parse(sqlText: String): LogicalPlan = {
85+
sqlParser.parse(sqlText)
86+
}
87+
}
88+
4789
/**
4890
* The entry point for working with structured data (rows and columns) in Spark. Allows the
4991
* creation of [[DataFrame]] objects as well as the execution of SQL queries.
@@ -132,17 +174,27 @@ class SQLContext(@transient val sparkContext: SparkContext)
132174
protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer
133175

134176
@transient
135-
protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))
136-
137-
@transient
138-
protected[sql] val sqlParser = {
139-
val fallback = new catalyst.SqlParser
140-
new SparkSQLParser(fallback.parse(_))
177+
protected[sql] val ddlParser = new DDLParser((sql: String) => { getSQLDialect().parse(sql) })
178+
179+
protected[sql] def getSQLDialect(): Dialect = {
180+
try {
181+
val clazz = Utils.classForName(dialectClassName)
182+
clazz.newInstance().asInstanceOf[Dialect]
183+
} catch {
184+
case NonFatal(e) =>
185+
// Since we didn't find the available SQL Dialect, it will fail even for SET command:
186+
// SET spark.sql.dialect=sql; Let's reset as default dialect automatically.
187+
val dialect = conf.dialect
188+
// reset the sql dialect
189+
conf.unsetConf(SQLConf.DIALECT)
190+
// throw out the exception, and the default sql dialect will take effect for next query.
191+
throw new DialectException(
192+
s"""Instantiating dialect '$dialect' failed.
193+
|Reverting to default dialect '${conf.dialect}'""".stripMargin, e)
194+
}
141195
}
142196

143-
protected[sql] def parseSql(sql: String): LogicalPlan = {
144-
ddlParser.parse(sql, false).getOrElse(sqlParser.parse(sql))
145-
}
197+
protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)
146198

147199
protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
148200

@@ -156,6 +208,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
156208
@transient
157209
protected[sql] val defaultSession = createSession()
158210

211+
protected[sql] def dialectClassName = if (conf.dialect == "sql") {
212+
classOf[DefaultDialect].getCanonicalName
213+
} else {
214+
conf.dialect
215+
}
216+
159217
sparkContext.getConf.getAll.foreach {
160218
case (key, value) if key.startsWith("spark.sql") => setConf(key, value)
161219
case _ =>
@@ -945,11 +1003,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
9451003
* @group basic
9461004
*/
9471005
def sql(sqlText: String): DataFrame = {
948-
if (conf.dialect == "sql") {
949-
DataFrame(this, parseSql(sqlText))
950-
} else {
951-
sys.error(s"Unsupported SQL dialect: ${conf.dialect}")
952-
}
1006+
DataFrame(this, parseSql(sqlText))
9531007
}
9541008

9551009
/**

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ private[sql] class DDLParser(
3838
parseQuery: String => LogicalPlan)
3939
extends AbstractSparkSQLParser with DataTypeParser with Logging {
4040

41-
def parse(input: String, exceptionOnError: Boolean): Option[LogicalPlan] = {
41+
def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
4242
try {
43-
Some(parse(input))
43+
parse(input)
4444
} catch {
4545
case ddlException: DDLException => throw ddlException
46-
case _ if !exceptionOnError => None
46+
case _ if !exceptionOnError => parseQuery(input)
4747
case x: Throwable => throw x
4848
}
4949
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,18 @@ package org.apache.spark.sql
1919

2020
import org.scalatest.BeforeAndAfterAll
2121

22+
import org.apache.spark.sql.catalyst.errors.DialectException
2223
import org.apache.spark.sql.execution.GeneratedAggregate
2324
import org.apache.spark.sql.functions._
2425
import org.apache.spark.sql.TestData._
2526
import org.apache.spark.sql.test.TestSQLContext
2627
import org.apache.spark.sql.test.TestSQLContext.{udf => _, _}
28+
2729
import org.apache.spark.sql.types._
2830

31+
/** A SQL Dialect for testing purpose, and it can not be nested type */
32+
class MyDialect extends DefaultDialect
33+
2934
class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
3035
// Make sure the tables are loaded.
3136
TestData
@@ -64,6 +69,23 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
6469
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
6570
}
6671

72+
test("SQL Dialect Switching to a new SQL parser") {
73+
val newContext = new SQLContext(TestSQLContext.sparkContext)
74+
newContext.setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName())
75+
assert(newContext.getSQLDialect().getClass === classOf[MyDialect])
76+
assert(newContext.sql("SELECT 1").collect() === Array(Row(1)))
77+
}
78+
79+
test("SQL Dialect Switch to an invalid parser with alias") {
80+
val newContext = new SQLContext(TestSQLContext.sparkContext)
81+
newContext.sql("SET spark.sql.dialect=MyTestClass")
82+
intercept[DialectException] {
83+
newContext.sql("SELECT 1")
84+
}
85+
// test if the dialect set back to DefaultSQLDialect
86+
assert(newContext.getSQLDialect().getClass === classOf[DefaultDialect])
87+
}
88+
6789
test("SPARK-4625 support SORT BY in SimpleSQLParser & DSL") {
6890
checkAnswer(
6991
sql("SELECT a FROM testData2 SORT BY a"),

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ package org.apache.spark.sql.hive
2020
import java.io.{BufferedReader, InputStreamReader, PrintStream}
2121
import java.sql.Timestamp
2222

23+
import org.apache.hadoop.hive.ql.parse.VariableSubstitution
24+
import org.apache.spark.sql.catalyst.Dialect
25+
2326
import scala.collection.JavaConversions._
2427
import scala.language.implicitConversions
2528

@@ -42,6 +45,15 @@ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNative
4245
import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy}
4346
import org.apache.spark.sql.types._
4447

48+
/**
49+
* This is the HiveQL Dialect, this dialect is strongly bind with HiveContext
50+
*/
51+
private[hive] class HiveQLDialect extends Dialect {
52+
override def parse(sqlText: String): LogicalPlan = {
53+
HiveQl.parseSql(sqlText)
54+
}
55+
}
56+
4557
/**
4658
* An instance of the Spark SQL execution engine that integrates with data stored in Hive.
4759
* Configuration for Hive is read from hive-site.xml on the classpath.
@@ -81,25 +93,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
8193
protected[sql] def convertCTAS: Boolean =
8294
getConf("spark.sql.hive.convertCTAS", "false").toBoolean
8395

84-
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
85-
new this.QueryExecution(plan)
86-
8796
@transient
88-
protected[sql] val ddlParserWithHiveQL = new DDLParser(HiveQl.parseSql(_))
89-
90-
override def sql(sqlText: String): DataFrame = {
91-
val substituted = new VariableSubstitution().substitute(hiveconf, sqlText)
92-
// TODO: Create a framework for registering parsers instead of just hardcoding if statements.
93-
if (conf.dialect == "sql") {
94-
super.sql(substituted)
95-
} else if (conf.dialect == "hiveql") {
96-
val ddlPlan = ddlParserWithHiveQL.parse(sqlText, exceptionOnError = false)
97-
DataFrame(this, ddlPlan.getOrElse(HiveQl.parseSql(substituted)))
98-
} else {
99-
sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
100-
}
97+
protected[sql] lazy val substitutor = new VariableSubstitution()
98+
99+
protected[sql] override def parseSql(sql: String): LogicalPlan = {
100+
super.parseSql(substitutor.substitute(hiveconf, sql))
101101
}
102102

103+
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
104+
new this.QueryExecution(plan)
105+
103106
/**
104107
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
105108
* Spark SQL or the external data source library it uses might cache certain metadata about a
@@ -356,6 +359,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
356359
}
357360
}
358361

362+
override protected[sql] def dialectClassName = if (conf.dialect == "hiveql") {
363+
classOf[HiveQLDialect].getCanonicalName
364+
} else {
365+
super.dialectClassName
366+
}
367+
359368
@transient
360369
private val hivePlanner = new SparkPlanner with HiveStrategies {
361370
val hiveContext = self

sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,10 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
107107
/** Fewer partitions to speed up testing. */
108108
protected[sql] override lazy val conf: SQLConf = new SQLConf {
109109
override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
110-
override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
110+
111+
// TODO as in unit test, conf.clear() probably be called, all of the value will be cleared.
112+
// The super.getConf(SQLConf.DIALECT) is "sql" by default, we need to set it as "hiveql"
113+
override def dialect: String = super.getConf(SQLConf.DIALECT, "hiveql")
111114
}
112115
}
113116

0 commit comments

Comments
 (0)