Skip to content

Commit 2668383

Browse files
rxinyhuai
authored andcommitted
---
yaml --- r: 15803 b: refs/heads/master c: e3c1366 h: refs/heads/master i: 15801: fddac72 15799: ad56bb6
1 parent a8f8837 commit 2668383

File tree

4 files changed

+101
-72
lines changed

4 files changed

+101
-72
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
---
2-
refs/heads/master: 890abd1279014d692548c9f3b557483644a0ee32
2+
refs/heads/master: e3c1366bbcf712f8d7a91640eb11e67a4419e4be

trunk/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}
3939
* supported by this builder (yet).
4040
*/
4141
class SQLBuilder(logicalPlan: LogicalPlan) extends Logging {
42-
require(logicalPlan.resolved, "SQLBuilder only supports resolved logical query plans")
42+
require(logicalPlan.resolved,
43+
"SQLBuilder only supports resolved logical query plans. Current plan:\n" + logicalPlan)
4344

4445
def this(df: Dataset[_]) = this(df.queryExecution.analyzed)
4546

trunk/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 56 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,12 @@ case class CreateViewCommand(
6363
}
6464

6565
override def run(sqlContext: SQLContext): Seq[Row] = {
66-
val analzyedPlan = sqlContext.executePlan(child).analyzed
66+
// If the plan cannot be analyzed, throw an exception and don't proceed.
67+
val qe = sqlContext.executePlan(child)
68+
qe.assertAnalyzed()
69+
val analyzedPlan = qe.analyzed
6770

68-
require(tableDesc.schema == Nil || tableDesc.schema.length == analzyedPlan.output.length)
71+
require(tableDesc.schema == Nil || tableDesc.schema.length == analyzedPlan.output.length)
6972
val sessionState = sqlContext.sessionState
7073

7174
if (sessionState.catalog.tableExists(tableIdentifier)) {
@@ -74,7 +77,7 @@ case class CreateViewCommand(
7477
// already exists.
7578
} else if (replace) {
7679
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
77-
sessionState.catalog.alterTable(prepareTable(sqlContext, analzyedPlan))
80+
sessionState.catalog.alterTable(prepareTable(sqlContext, analyzedPlan))
7881
} else {
7982
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
8083
// exists.
@@ -85,68 +88,74 @@ case class CreateViewCommand(
8588
} else {
8689
// Create the view if it doesn't exist.
8790
sessionState.catalog.createTable(
88-
prepareTable(sqlContext, analzyedPlan), ignoreIfExists = false)
91+
prepareTable(sqlContext, analyzedPlan), ignoreIfExists = false)
8992
}
9093

9194
Seq.empty[Row]
9295
}
9396

94-
private def prepareTable(sqlContext: SQLContext, analzyedPlan: LogicalPlan): CatalogTable = {
95-
val expandedText = if (sqlContext.conf.canonicalView) {
96-
try rebuildViewQueryString(sqlContext, analzyedPlan) catch {
97-
case NonFatal(e) => wrapViewTextWithSelect(analzyedPlan)
97+
/**
98+
* Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
99+
* SQL based on the analyzed plan, and also creates the proper schema for the view.
100+
*/
101+
private def prepareTable(sqlContext: SQLContext, analyzedPlan: LogicalPlan): CatalogTable = {
102+
val viewSQL: String =
103+
if (sqlContext.conf.canonicalView) {
104+
val logicalPlan =
105+
if (tableDesc.schema.isEmpty) {
106+
analyzedPlan
107+
} else {
108+
val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
109+
case (attr, col) => Alias(attr, col.name)()
110+
}
111+
sqlContext.executePlan(Project(projectList, analyzedPlan)).analyzed
112+
}
113+
new SQLBuilder(logicalPlan).toSQL
114+
} else {
115+
// When user specified column names for view, we should create a project to do the renaming.
116+
// When no column name specified, we still need to create a project to declare the columns
117+
// we need, to make us more robust to top level `*`s.
118+
val viewOutput = {
119+
val columnNames = analyzedPlan.output.map(f => quote(f.name))
120+
if (tableDesc.schema.isEmpty) {
121+
columnNames.mkString(", ")
122+
} else {
123+
columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
124+
case (name, alias) => s"$name AS $alias"
125+
}.mkString(", ")
126+
}
127+
}
128+
129+
val viewText = tableDesc.viewText.get
130+
val viewName = quote(tableDesc.identifier.table)
131+
s"SELECT $viewOutput FROM ($viewText) $viewName"
98132
}
99-
} else {
100-
wrapViewTextWithSelect(analzyedPlan)
133+
134+
// Validate the view SQL - make sure we can parse it and analyze it.
135+
// If we cannot analyze the generated query, there is probably a bug in SQL generation.
136+
try {
137+
sqlContext.sql(viewSQL).queryExecution.assertAnalyzed()
138+
} catch {
139+
case NonFatal(e) =>
140+
throw new RuntimeException(
141+
"Failed to analyze the canonicalized SQL. It is possible there is a bug in Spark.", e)
101142
}
102143

103-
val viewSchema = {
144+
val viewSchema: Seq[CatalogColumn] = {
104145
if (tableDesc.schema.isEmpty) {
105-
analzyedPlan.output.map { a =>
146+
analyzedPlan.output.map { a =>
106147
CatalogColumn(a.name, a.dataType.simpleString)
107148
}
108149
} else {
109-
analzyedPlan.output.zip(tableDesc.schema).map { case (a, col) =>
150+
analyzedPlan.output.zip(tableDesc.schema).map { case (a, col) =>
110151
CatalogColumn(col.name, a.dataType.simpleString, nullable = true, col.comment)
111152
}
112153
}
113154
}
114155

115-
tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
116-
}
117-
118-
private def wrapViewTextWithSelect(analzyedPlan: LogicalPlan): String = {
119-
// When user specified column names for view, we should create a project to do the renaming.
120-
// When no column name specified, we still need to create a project to declare the columns
121-
// we need, to make us more robust to top level `*`s.
122-
val viewOutput = {
123-
val columnNames = analzyedPlan.output.map(f => quote(f.name))
124-
if (tableDesc.schema.isEmpty) {
125-
columnNames.mkString(", ")
126-
} else {
127-
columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
128-
case (name, alias) => s"$name AS $alias"
129-
}.mkString(", ")
130-
}
131-
}
132-
133-
val viewText = tableDesc.viewText.get
134-
val viewName = quote(tableDesc.identifier.table)
135-
s"SELECT $viewOutput FROM ($viewText) $viewName"
136-
}
137-
138-
private def rebuildViewQueryString(sqlContext: SQLContext, analzyedPlan: LogicalPlan): String = {
139-
val logicalPlan = if (tableDesc.schema.isEmpty) {
140-
analzyedPlan
141-
} else {
142-
val projectList = analzyedPlan.output.zip(tableDesc.schema).map {
143-
case (attr, col) => Alias(attr, col.name)()
144-
}
145-
sqlContext.executePlan(Project(projectList, analzyedPlan)).analyzed
146-
}
147-
new SQLBuilder(logicalPlan).toSQL
156+
tableDesc.copy(schema = viewSchema, viewText = Some(viewSQL))
148157
}
149158

150-
// escape backtick with double-backtick in column name and wrap it with backtick.
159+
/** Escape backtick with double-backtick in column name and wrap it with backtick. */
151160
private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
152161
}

trunk/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,50 @@ import org.apache.spark.sql.test.SQLTestUtils
2828
class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
2929
import hiveContext.implicits._
3030

31+
override def beforeAll(): Unit = {
32+
// Create a simple table with two columns: id and id1
33+
sqlContext.range(1, 10).selectExpr("id", "id id1").write.format("json").saveAsTable("jt")
34+
}
35+
36+
override def afterAll(): Unit = {
37+
sqlContext.sql(s"DROP TABLE IF EXISTS jt")
38+
}
39+
40+
test("nested views") {
41+
withView("jtv1", "jtv2") {
42+
sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3").collect()
43+
sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6").collect()
44+
checkAnswer(sql("select count(*) FROM jtv2"), Row(2))
45+
}
46+
}
47+
48+
test("error handling: fail if the view sql itself is invalid") {
49+
// A table that does not exist
50+
intercept[AnalysisException] {
51+
sql("CREATE OR REPLACE VIEW myabcdview AS SELECT * FROM table_not_exist1345").collect()
52+
}
53+
54+
// A column that does not exist
55+
intercept[AnalysisException] {
56+
sql("CREATE OR REPLACE VIEW myabcdview AS SELECT random1234 FROM jt").collect()
57+
}
58+
}
59+
3160
test("correctly parse CREATE VIEW statement") {
3261
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
33-
withTable("jt") {
34-
val df = (1 until 10).map(i => i -> i).toDF("i", "j")
35-
df.write.format("json").saveAsTable("jt")
36-
sql(
37-
"""CREATE VIEW IF NOT EXISTS
38-
|default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
39-
|TBLPROPERTIES ('a' = 'b')
40-
|AS SELECT * FROM jt""".stripMargin)
41-
checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
42-
sql("DROP VIEW testView")
43-
}
62+
sql(
63+
"""CREATE VIEW IF NOT EXISTS
64+
|default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
65+
|TBLPROPERTIES ('a' = 'b')
66+
|AS SELECT * FROM jt""".stripMargin)
67+
checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
68+
sql("DROP VIEW testView")
4469
}
4570
}
4671

4772
test("correctly handle CREATE VIEW IF NOT EXISTS") {
4873
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
49-
withTable("jt", "jt2") {
50-
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
74+
withTable("jt2") {
5175
sql("CREATE VIEW testView AS SELECT id FROM jt")
5276

5377
val df = (1 until 10).map(i => i -> i).toDF("i", "j")
@@ -66,8 +90,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
6690
test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
6791
withSQLConf(
6892
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
69-
withTable("jt", "jt2") {
70-
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
93+
withTable("jt2") {
7194
sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
7295
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
7396

@@ -90,9 +113,8 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
90113
test(s"$prefix correctly handle ALTER VIEW") {
91114
withSQLConf(
92115
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
93-
withTable("jt", "jt2") {
116+
withTable("jt2") {
94117
withView("testView") {
95-
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
96118
sql("CREATE VIEW testView AS SELECT id FROM jt")
97119

98120
val df = (1 until 10).map(i => i -> i).toDF("i", "j")
@@ -109,12 +131,9 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
109131
// json table is not hive-compatible, make sure the new flag fix it.
110132
withSQLConf(
111133
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
112-
withTable("jt") {
113-
withView("testView") {
114-
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
115-
sql("CREATE VIEW testView AS SELECT id FROM jt")
116-
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
117-
}
134+
withView("testView") {
135+
sql("CREATE VIEW testView AS SELECT id FROM jt")
136+
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
118137
}
119138
}
120139
}

0 commit comments

Comments
 (0)