Skip to content

Commit e522971

Browse files
yhuaimarmbrus
authored andcommitted
[SPARK-2339][SQL] SQL parser in sql-core is case sensitive, but a table alias is converted to lower case when we create Subquery
Reported by http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Join-throws-exception-td8599.html After we get the table from the catalog, because the table has an alias, we will temporarily insert a Subquery. Then, we convert the table alias to lower case no matter if the parser is case sensitive or not. To see the issue ... ``` val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD case class Person(name: String, age: Int) val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)) people.registerAsTable("people") sqlContext.sql("select PEOPLE.name from people PEOPLE") ``` The plan is ... ``` == Query Plan == Project ['PEOPLE.name] ExistingRdd [name#0,age#1], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:176 ``` You can find that `PEOPLE.name` is not resolved. This PR introduces three changes. 1. If a table has an alias, the catalog will not lowercase the alias. If a lowercase alias is needed, the analyzer will do the work. 2. A catalog has a new val caseSensitive that indicates if this catalog is case sensitive or not. For example, a SimpleCatalog is case sensitive, but 3. Corresponding unit tests. With this PR, case sensitivity of database names and table names is handled by the catalog. Case sensitivity of other identifiers are handled by the analyzer. JIRA: https://issues.apache.org/jira/browse/SPARK-2339 Author: Yin Huai <[email protected]> Closes #1317 from yhuai/SPARK-2339 and squashes the following commits: 12d8006 [Yin Huai] Handling case sensitivity correctly. This patch introduces three changes. 1. If a table has an alias, the catalog will not lowercase the alias. If a lowercase alias is needed, the analyzer will do the work. 2. A catalog has a new val caseSensitive that indicates if this catalog is case sensitive or not. For example, a SimpleCatalog is case sensitive, but 3. Corresponding unit tests. With this patch, case sensitivity of database names and table names is handled by the catalog. Case sensitivity of other identifiers is handled by the analyzer. (cherry picked from commit c0b4cf0) Signed-off-by: Michael Armbrust <[email protected]>
1 parent 5044ba6 commit e522971

File tree

6 files changed

+149
-30
lines changed

6 files changed

+149
-30
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
2525
* An interface for looking up relations by name. Used by an [[Analyzer]].
2626
*/
2727
trait Catalog {
28+
29+
def caseSensitive: Boolean
30+
2831
def lookupRelation(
2932
databaseName: Option[String],
3033
tableName: String,
@@ -35,22 +38,44 @@ trait Catalog {
3538
def unregisterTable(databaseName: Option[String], tableName: String): Unit
3639

3740
def unregisterAllTables(): Unit
41+
42+
protected def processDatabaseAndTableName(
43+
databaseName: Option[String],
44+
tableName: String): (Option[String], String) = {
45+
if (!caseSensitive) {
46+
(databaseName.map(_.toLowerCase), tableName.toLowerCase)
47+
} else {
48+
(databaseName, tableName)
49+
}
50+
}
51+
52+
protected def processDatabaseAndTableName(
53+
databaseName: String,
54+
tableName: String): (String, String) = {
55+
if (!caseSensitive) {
56+
(databaseName.toLowerCase, tableName.toLowerCase)
57+
} else {
58+
(databaseName, tableName)
59+
}
60+
}
3861
}
3962

40-
class SimpleCatalog extends Catalog {
63+
class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
4164
val tables = new mutable.HashMap[String, LogicalPlan]()
4265

4366
override def registerTable(
4467
databaseName: Option[String],
4568
tableName: String,
4669
plan: LogicalPlan): Unit = {
47-
tables += ((tableName, plan))
70+
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
71+
tables += ((tblName, plan))
4872
}
4973

5074
override def unregisterTable(
5175
databaseName: Option[String],
5276
tableName: String) = {
53-
tables -= tableName
77+
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
78+
tables -= tblName
5479
}
5580

5681
override def unregisterAllTables() = {
@@ -61,12 +86,13 @@ class SimpleCatalog extends Catalog {
6186
databaseName: Option[String],
6287
tableName: String,
6388
alias: Option[String] = None): LogicalPlan = {
64-
val table = tables.get(tableName).getOrElse(sys.error(s"Table Not Found: $tableName"))
65-
val tableWithQualifiers = Subquery(tableName, table)
89+
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
90+
val table = tables.get(tblName).getOrElse(sys.error(s"Table Not Found: $tableName"))
91+
val tableWithQualifiers = Subquery(tblName, table)
6692

6793
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
6894
// properly qualified with this alias.
69-
alias.map(a => Subquery(a.toLowerCase, tableWithQualifiers)).getOrElse(tableWithQualifiers)
95+
alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
7096
}
7197
}
7298

@@ -85,26 +111,28 @@ trait OverrideCatalog extends Catalog {
85111
databaseName: Option[String],
86112
tableName: String,
87113
alias: Option[String] = None): LogicalPlan = {
88-
89-
val overriddenTable = overrides.get((databaseName, tableName))
114+
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
115+
val overriddenTable = overrides.get((dbName, tblName))
90116

91117
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
92118
// properly qualified with this alias.
93119
val withAlias =
94-
overriddenTable.map(r => alias.map(a => Subquery(a.toLowerCase, r)).getOrElse(r))
120+
overriddenTable.map(r => alias.map(a => Subquery(a, r)).getOrElse(r))
95121

96-
withAlias.getOrElse(super.lookupRelation(databaseName, tableName, alias))
122+
withAlias.getOrElse(super.lookupRelation(dbName, tblName, alias))
97123
}
98124

99125
override def registerTable(
100126
databaseName: Option[String],
101127
tableName: String,
102128
plan: LogicalPlan): Unit = {
103-
overrides.put((databaseName, tableName), plan)
129+
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
130+
overrides.put((dbName, tblName), plan)
104131
}
105132

106133
override def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
107-
overrides.remove((databaseName, tableName))
134+
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
135+
overrides.remove((dbName, tblName))
108136
}
109137

110138
override def unregisterAllTables(): Unit = {
@@ -117,6 +145,9 @@ trait OverrideCatalog extends Catalog {
117145
* relations are already filled in and the analyser needs only to resolve attribute references.
118146
*/
119147
object EmptyCatalog extends Catalog {
148+
149+
val caseSensitive: Boolean = true
150+
120151
def lookupRelation(
121152
databaseName: Option[String],
122153
tableName: String,

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,81 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20-
import org.scalatest.FunSuite
20+
import org.scalatest.{BeforeAndAfter, FunSuite}
2121

22+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
2223
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2324
import org.apache.spark.sql.catalyst.plans.logical._
25+
import org.apache.spark.sql.catalyst.types.IntegerType
2426

25-
/* Implicit conversions */
26-
import org.apache.spark.sql.catalyst.dsl.expressions._
27+
class AnalysisSuite extends FunSuite with BeforeAndAfter {
28+
val caseSensitiveCatalog = new SimpleCatalog(true)
29+
val caseInsensitiveCatalog = new SimpleCatalog(false)
30+
val caseSensitiveAnalyze =
31+
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true)
32+
val caseInsensitiveAnalyze =
33+
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false)
2734

28-
class AnalysisSuite extends FunSuite {
29-
val analyze = SimpleAnalyzer
35+
val testRelation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)())
3036

31-
val testRelation = LocalRelation('a.int)
37+
before {
38+
caseSensitiveCatalog.registerTable(None, "TaBlE", testRelation)
39+
caseInsensitiveCatalog.registerTable(None, "TaBlE", testRelation)
40+
}
3241

3342
test("analyze project") {
3443
assert(
35-
analyze(Project(Seq(UnresolvedAttribute("a")), testRelation)) ===
44+
caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("a")), testRelation)) ===
45+
Project(testRelation.output, testRelation))
46+
47+
assert(
48+
caseSensitiveAnalyze(
49+
Project(Seq(UnresolvedAttribute("TbL.a")),
50+
UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
51+
Project(testRelation.output, testRelation))
52+
53+
val e = intercept[TreeNodeException[_]] {
54+
caseSensitiveAnalyze(
55+
Project(Seq(UnresolvedAttribute("tBl.a")),
56+
UnresolvedRelation(None, "TaBlE", Some("TbL"))))
57+
}
58+
assert(e.getMessage().toLowerCase.contains("unresolved"))
59+
60+
assert(
61+
caseInsensitiveAnalyze(
62+
Project(Seq(UnresolvedAttribute("TbL.a")),
63+
UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
3664
Project(testRelation.output, testRelation))
65+
66+
assert(
67+
caseInsensitiveAnalyze(
68+
Project(Seq(UnresolvedAttribute("tBl.a")),
69+
UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
70+
Project(testRelation.output, testRelation))
71+
}
72+
73+
test("resolve relations") {
74+
val e = intercept[RuntimeException] {
75+
caseSensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None))
76+
}
77+
assert(e.getMessage === "Table Not Found: tAbLe")
78+
79+
assert(
80+
caseSensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) ===
81+
testRelation)
82+
83+
assert(
84+
caseInsensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None)) ===
85+
testRelation)
86+
87+
assert(
88+
caseInsensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) ===
89+
testRelation)
3790
}
3891

3992
test("throw errors for unresolved attributes during analysis") {
4093
val e = intercept[TreeNodeException[_]] {
41-
analyze(Project(Seq(UnresolvedAttribute("abcd")), testRelation))
94+
caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("abcd")), testRelation))
4295
}
4396
assert(e.getMessage().toLowerCase.contains("unresolved"))
4497
}

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
5757
self =>
5858

5959
@transient
60-
protected[sql] lazy val catalog: Catalog = new SimpleCatalog
60+
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
6161
@transient
6262
protected[sql] lazy val analyzer: Analyzer =
6363
new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,15 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
4646

4747
val client = Hive.get(hive.hiveconf)
4848

49+
val caseSensitive: Boolean = false
50+
4951
def lookupRelation(
5052
db: Option[String],
5153
tableName: String,
5254
alias: Option[String]): LogicalPlan = {
53-
val databaseName = db.getOrElse(hive.sessionState.getCurrentDatabase)
54-
val table = client.getTable(databaseName, tableName)
55+
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
56+
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
57+
val table = client.getTable(databaseName, tblName)
5558
val partitions: Seq[Partition] =
5659
if (table.isPartitioned) {
5760
client.getAllPartitionsForPruner(table).toSeq
@@ -61,8 +64,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
6164

6265
// Since HiveQL is case insensitive for table names we make them all lowercase.
6366
MetastoreRelation(
64-
databaseName.toLowerCase,
65-
tableName.toLowerCase,
67+
databaseName,
68+
tblName,
6669
alias)(table.getTTable, partitions.map(part => part.getTPartition))
6770
}
6871

@@ -71,7 +74,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
7174
tableName: String,
7275
schema: Seq[Attribute],
7376
allowExisting: Boolean = false): Unit = {
74-
val table = new Table(databaseName, tableName)
77+
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
78+
val table = new Table(dbName, tblName)
7579
val hiveSchema =
7680
schema.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), ""))
7781
table.setFields(hiveSchema)
@@ -86,7 +90,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
8690
sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat")
8791
sd.setOutputFormat("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
8892
val serDeInfo = new SerDeInfo()
89-
serDeInfo.setName(tableName)
93+
serDeInfo.setName(tblName)
9094
serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
9195
serDeInfo.setParameters(Map[String, String]())
9296
sd.setSerdeInfo(serDeInfo)
@@ -105,13 +109,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
105109
object CreateTables extends Rule[LogicalPlan] {
106110
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
107111
case InsertIntoCreatedTable(db, tableName, child) =>
108-
val databaseName = db.getOrElse(hive.sessionState.getCurrentDatabase)
112+
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
113+
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
109114

110-
createTable(databaseName, tableName, child.output)
115+
createTable(databaseName, tblName, child.output)
111116

112117
InsertIntoTable(
113118
EliminateAnalysisOperators(
114-
lookupRelation(Some(databaseName), tableName, None)),
119+
lookupRelation(Some(databaseName), tblName, None)),
115120
Map.empty,
116121
child,
117122
overwrite = false)
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
0 val_0
2+
4 val_4
3+
12 val_12
4+
8 val_8
5+
0 val_0
6+
0 val_0
7+
10 val_10
8+
5 val_5
9+
11 val_11
10+
5 val_5
11+
2 val_2
12+
12 val_12
13+
5 val_5
14+
9 val_9

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,22 @@ class HiveQuerySuite extends HiveComparisonTest {
210210
}
211211
}
212212

213+
createQueryTest("case sensitivity: Hive table",
214+
"SELECT srcalias.KEY, SRCALIAS.value FROM sRc SrCAlias WHERE SrCAlias.kEy < 15")
215+
216+
test("case sensitivity: registered table") {
217+
val testData: SchemaRDD =
218+
TestHive.sparkContext.parallelize(
219+
TestData(1, "str1") ::
220+
TestData(2, "str2") :: Nil)
221+
testData.registerAsTable("REGisteredTABle")
222+
223+
assertResult(Array(Array(2, "str2"))) {
224+
hql("SELECT tablealias.A, TABLEALIAS.b FROM reGisteredTABle TableAlias " +
225+
"WHERE TableAliaS.a > 1").collect()
226+
}
227+
}
228+
213229
def isExplanation(result: SchemaRDD) = {
214230
val explanation = result.select('plan).collect().map { case Row(plan: String) => plan }
215231
explanation.size > 1 && explanation.head.startsWith("Physical execution plan")

0 commit comments

Comments
 (0)