From 509327e303984906ce4c87e459d6323582963089 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 3 Nov 2016 19:42:18 -0700 Subject: [PATCH 1/9] fix. --- .../sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/catalog/SessionCatalog.scala | 6 ++-- .../spark/sql/catalyst/dsl/package.scala | 4 +-- .../sql/catalyst/optimizer/subquery.scala | 2 +- .../sql/catalyst/parser/AstBuilder.scala | 4 +-- .../plans/logical/basicLogicalOperators.scala | 5 ++- .../sql/catalyst/analysis/AnalysisSuite.scala | 4 +-- .../catalog/SessionCatalogSuite.scala | 12 +++---- .../optimizer/ColumnPruningSuite.scala | 8 ++--- .../EliminateSubqueryAliasesSuite.scala | 6 ++-- .../optimizer/JoinOptimizationSuite.scala | 8 ++--- .../sql/catalyst/parser/PlanParserSuite.scala | 2 +- .../scala/org/apache/spark/sql/Dataset.scala | 2 +- .../spark/sql/catalyst/SQLBuilder.scala | 4 +-- .../spark/sql/execution/command/views.scala | 19 +++++++++--- .../sql/execution/datasources/rules.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 12 +++---- .../spark/sql/hive/HiveSessionCatalog.scala | 6 ++-- .../sql/hive/execution/SQLViewSuite.scala | 31 ++++++++++++++----- 19 files changed, 85 insertions(+), 54 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5011f2fdbf9b7..91883381e1d55 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -134,7 +134,7 @@ class Analyzer( case u : UnresolvedRelation => val substituted = cteRelations.find(x => resolver(x._1, u.tableIdentifier.table)) .map(_._2).map { relation => - val withAlias = u.alias.map(SubqueryAlias(_, relation, None)) + val withAlias = u.alias.map(SubqueryAlias(_, relation, None)()) withAlias.getOrElse(relation) } substituted.getOrElse(u) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 2d2120dda8bde..d3f08e8ab9990 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -553,16 +553,16 @@ class SessionCatalog( val relationAlias = alias.getOrElse(table) if (db == globalTempViewManager.database) { globalTempViewManager.get(table).map { viewDef => - SubqueryAlias(relationAlias, viewDef, Some(name)) + SubqueryAlias(relationAlias, viewDef, Some(name))(isGeneratedByTempTable = true) }.getOrElse(throw new NoSuchTableException(db, table)) } else if (name.database.isDefined || !tempTables.contains(table)) { val metadata = externalCatalog.getTable(db, table) val view = Option(metadata.tableType).collect { case CatalogTableType.VIEW => name } - SubqueryAlias(relationAlias, SimpleCatalogRelation(db, metadata), view) + SubqueryAlias(relationAlias, SimpleCatalogRelation(db, metadata), view)() } else { - SubqueryAlias(relationAlias, tempTables(table), Option(name)) + SubqueryAlias(relationAlias, tempTables(table), Option(name))(isGeneratedByTempTable = true) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index e901683be6854..0be6fcb81f6da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -347,7 +347,7 @@ package object dsl { orderSpec: Seq[SortOrder]): LogicalPlan = Window(windowExpressions, partitionSpec, orderSpec, logicalPlan) - def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan, None) + def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan, None)() def except(otherPlan: LogicalPlan): LogicalPlan = Except(logicalPlan, otherPlan) @@ -371,7 +371,7 @@ package object dsl { def as(alias: String): LogicalPlan = logicalPlan match { case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias)) - case plan => SubqueryAlias(alias, plan, None) + case plan => SubqueryAlias(alias, plan, None)() } def repartition(num: Integer): LogicalPlan = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index f14aaab72a98f..f59b4e395d991 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -293,7 +293,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { case Project(projList, _) => subqueryRoot = Project(projList ++ havingInputs, subqueryRoot) case s @ SubqueryAlias(alias, _, None) => - subqueryRoot = SubqueryAlias(alias, subqueryRoot, None) + subqueryRoot = SubqueryAlias(alias, subqueryRoot, None)() case op => sys.error(s"Unexpected operator $op in corelated subquery") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 4b151c81d8f8b..fbf75582ee79a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -108,7 +108,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { * This is only used for Common Table Expressions. */ override def visitNamedQuery(ctx: NamedQueryContext): SubqueryAlias = withOrigin(ctx) { - SubqueryAlias(ctx.name.getText, plan(ctx.query), None) + SubqueryAlias(ctx.name.getText, plan(ctx.query), None)() } /** @@ -729,7 +729,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { * Create an alias (SubqueryAlias) for a LogicalPlan. */ private def aliasPlan(alias: ParserRuleContext, plan: LogicalPlan): LogicalPlan = { - SubqueryAlias(alias.getText, plan, None) + SubqueryAlias(alias.getText, plan, None)() } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 65ceab2ce27b1..96e6cee3ed770 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -709,10 +709,13 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo case class SubqueryAlias( alias: String, child: LogicalPlan, - view: Option[TableIdentifier]) + view: Option[TableIdentifier])( + val isGeneratedByTempTable: java.lang.Boolean = false) extends UnaryNode { override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias))) + + override protected def otherCopyArgs: Seq[AnyRef] = isGeneratedByTempTable :: Nil } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 817de48de2798..45372fbb29b14 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -369,8 +369,8 @@ class AnalysisSuite extends AnalysisTest with ShouldMatchers { val query = Project(Seq($"x.key", $"y.key"), Join( - Project(Seq($"x.key"), SubqueryAlias("x", input, None)), - Project(Seq($"y.key"), SubqueryAlias("y", input, None)), + Project(Seq($"x.key"), SubqueryAlias("x", input, None)()), + Project(Seq($"y.key"), SubqueryAlias("y", input, None)()), Cross, None)) assertAnalysisSuccess(query) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index b77fef225a0c8..e3198c75bc3e8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -406,24 +406,24 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.setCurrentDatabase("db2") // If we explicitly specify the database, we'll look up the relation in that database assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))) - == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None)) + == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None)()) // Otherwise, we'll first look up a temporary table with the same name assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) - == SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1")))) + == SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1")))()) // Then, if that does not exist, look up the relation in the current database sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) - == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None)) + == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None)()) } test("lookup table relation with alias") { val catalog = new SessionCatalog(newBasicCatalog()) val alias = "monster" val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2"))) - val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata), None) + val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata), None)() val relationWithAlias = SubqueryAlias(alias, - SimpleCatalogRelation("db2", tableMetadata), None) + SimpleCatalogRelation("db2", tableMetadata), None)() assert(catalog.lookupRelation( TableIdentifier("tbl1", Some("db2")), alias = None) == relation) assert(catalog.lookupRelation( @@ -435,7 +435,7 @@ class SessionCatalogSuite extends SparkFunSuite { val tmpView = Range(1, 10, 2, 10) catalog.createTempView("vw1", tmpView, overrideIfExists = false) val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range")) - assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1")))) + assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1")))()) } test("table exists") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 5bd1bc80c3b8a..854e759afa4ef 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -320,16 +320,16 @@ class ColumnPruningSuite extends PlanTest { val query = Project(Seq($"x.key", $"y.key"), Join( - SubqueryAlias("x", input, None), - BroadcastHint(SubqueryAlias("y", input, None)), Inner, None)).analyze + SubqueryAlias("x", input, None)(), + BroadcastHint(SubqueryAlias("y", input, None)()), Inner, None)).analyze val optimized = Optimize.execute(query) val expected = Join( - Project(Seq($"x.key"), SubqueryAlias("x", input, None)), + Project(Seq($"x.key"), SubqueryAlias("x", input, None)()), BroadcastHint( - Project(Seq($"y.key"), SubqueryAlias("y", input, None))), + Project(Seq($"y.key"), SubqueryAlias("y", input, None)())), Inner, None).analyze comparePlans(optimized, expected) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala index a8aeedbd62759..35efaf8465698 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala @@ -46,13 +46,13 @@ class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper { test("eliminate top level subquery") { val input = LocalRelation('a.int, 'b.int) - val query = SubqueryAlias("a", input, None) + val query = SubqueryAlias("a", input, None)() comparePlans(afterOptimization(query), input) } test("eliminate mid-tree subquery") { val input = LocalRelation('a.int, 'b.int) - val query = Filter(TrueLiteral, SubqueryAlias("a", input, None)) + val query = Filter(TrueLiteral, SubqueryAlias("a", input, None)()) comparePlans( afterOptimization(query), Filter(TrueLiteral, LocalRelation('a.int, 'b.int))) @@ -61,7 +61,7 @@ class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper { test("eliminate multiple subqueries") { val input = LocalRelation('a.int, 'b.int) val query = Filter(TrueLiteral, - SubqueryAlias("c", SubqueryAlias("b", SubqueryAlias("a", input, None), None), None)) + SubqueryAlias("c", SubqueryAlias("b", SubqueryAlias("a", input, None)(), None)(), None)()) comparePlans( afterOptimization(query), Filter(TrueLiteral, LocalRelation('a.int, 'b.int))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index 087718b3ecf1a..18c4da9595521 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -129,15 +129,15 @@ class JoinOptimizationSuite extends PlanTest { val query = Project(Seq($"x.key", $"y.key"), Join( - SubqueryAlias("x", input, None), - BroadcastHint(SubqueryAlias("y", input, None)), Cross, None)).analyze + SubqueryAlias("x", input, None)(), + BroadcastHint(SubqueryAlias("y", input, None)()), Cross, None)).analyze val optimized = Optimize.execute(query) val expected = Join( - Project(Seq($"x.key"), SubqueryAlias("x", input, None)), - BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input, None))), + Project(Seq($"x.key"), SubqueryAlias("x", input, None)()), + BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input, None)())), Cross, None).analyze comparePlans(optimized, expected) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 7400f3430e99c..2ab71151c9397 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -79,7 +79,7 @@ class PlanParserSuite extends PlanTest { def cte(plan: LogicalPlan, namedPlans: (String, LogicalPlan)*): With = { val ctes = namedPlans.map { case (name, cte) => - name -> SubqueryAlias(name, cte, None) + name -> SubqueryAlias(name, cte, None)() } With(plan, ctes) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index eb2b20afc37cf..6d3464a8ac908 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1039,7 +1039,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def as(alias: String): Dataset[T] = withTypedPlan { - SubqueryAlias(alias, logicalPlan, None) + SubqueryAlias(alias, logicalPlan, None)() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala index 6f821f80cc4c5..4c562e277ea65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala @@ -75,7 +75,7 @@ class SQLBuilder private ( val aliasedOutput = canonicalizedPlan.output.zip(outputNames).map { case (attr, name) => Alias(attr.withQualifier(None), name)() } - val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan, None)) + val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan, None)()) try { val replaced = finalPlan.transformAllExpressions { @@ -563,7 +563,7 @@ class SQLBuilder private ( } private def addSubquery(plan: LogicalPlan): SubqueryAlias = { - SubqueryAlias(newSubqueryName(), plan, None) + SubqueryAlias(newSubqueryName(), plan, None)() } private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = plan match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index bbcd9c4ef564c..d3dc1f023ef5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -19,14 +19,13 @@ package org.apache.spark.sql.execution.command import scala.util.control.NonFatal -import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation} -import org.apache.spark.sql.types.{MetadataBuilder, StructType} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias} +import org.apache.spark.sql.types.MetadataBuilder /** @@ -131,6 +130,18 @@ case class CreateViewCommand( s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).") } + // When creating a permanent view, not allowed to reference temporary objects. For example, + // temporary views. + // TODO: Disallow creating permanent views based on temporary UDFs + if (!isTemporary) { + analyzedPlan.collectFirst { + case s: SubqueryAlias if s.isGeneratedByTempTable => + throw new AnalysisException(s"Not allowed to create a permanent view $name by " + + s"referencing a temp view `${s.alias}`. " + + originalText.map(sql => s"""SQL: "$sql".""").getOrElse("")) + } + } + val aliasedPlan = if (userSpecifiedColumns.isEmpty) { analyzedPlan } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 4647b11af4dfb..841c9dbfc8fa1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -55,7 +55,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { s"${u.tableIdentifier.database.get}") } val plan = LogicalRelation(dataSource.resolveRelation()) - u.alias.map(a => SubqueryAlias(u.alias.get, plan, None)).getOrElse(plan) + u.alias.map(a => SubqueryAlias(u.alias.get, plan, None)()).getOrElse(plan) } catch { case e: ClassNotFoundException => u case e: Exception => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8e5fc88aad448..8eee914243c8e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -106,21 +106,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log if (DDLUtils.isDatasourceTable(table)) { val dataSourceTable = cachedDataSourceTables(qualifiedTableName) - val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None) + val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None)() // Then, if alias is specified, wrap the table with a Subquery using the alias. // Otherwise, wrap the table with a Subquery using the table name. - alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable) + alias.map(a => SubqueryAlias(a, qualifiedTable, None)()).getOrElse(qualifiedTable) } else if (table.tableType == CatalogTableType.VIEW) { val viewText = table.viewText.getOrElse(sys.error("Invalid view without text.")) SubqueryAlias( alias.getOrElse(table.identifier.table), sparkSession.sessionState.sqlParser.parsePlan(viewText), - Option(table.identifier)) + Option(table.identifier))() } else { val qualifiedTable = MetastoreRelation( qualifiedTableName.database, qualifiedTableName.name)(table, sparkSession) - alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable) + alias.map(a => SubqueryAlias(a, qualifiedTable, None)()).getOrElse(qualifiedTable) } } @@ -312,7 +312,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // Read path case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => val parquetRelation = convertToParquetRelation(relation) - SubqueryAlias(relation.tableName, parquetRelation, None) + SubqueryAlias(relation.tableName, parquetRelation, None)() } } } @@ -350,7 +350,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // Read path case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => val orcRelation = convertToOrcRelation(relation) - SubqueryAlias(relation.tableName, orcRelation, None) + SubqueryAlias(relation.tableName, orcRelation, None)() } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 4f2910abfd216..4c056051aa41c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -61,7 +61,7 @@ private[sql] class HiveSessionCatalog( if (db == globalTempViewManager.database) { val relationAlias = alias.getOrElse(table) globalTempViewManager.get(table).map { viewDef => - SubqueryAlias(relationAlias, viewDef, Some(name)) + SubqueryAlias(relationAlias, viewDef, Some(name))(isGeneratedByTempTable = true) }.getOrElse(throw new NoSuchTableException(db, table)) } else if (name.database.isDefined || !tempTables.contains(table)) { val database = name.database.map(formatDatabaseName) @@ -69,10 +69,10 @@ private[sql] class HiveSessionCatalog( metastoreCatalog.lookupRelation(newName, alias) } else { val relation = tempTables(table) - val tableWithQualifiers = SubqueryAlias(table, relation, None) + val tableWithQualifiers = SubqueryAlias(table, relation, None)(isGeneratedByTempTable = true) // If an alias was specified by the lookup, wrap the plan in a subquery so that // attributes are properly qualified with this alias. - alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers) + alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)()).getOrElse(tableWithQualifiers) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index 2af935da689c9..0192daf63ddad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -38,21 +38,38 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { spark.sql(s"DROP TABLE IF EXISTS jt") } - test("nested views (interleaved with temporary views)") { - withView("jtv1", "jtv2", "jtv3", "temp_jtv1", "temp_jtv2", "temp_jtv3") { + test("create a persistent view on a persistent view") { + withView("jtv1", "jtv2") { sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3") sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6") checkAnswer(sql("select count(*) FROM jtv2"), Row(2)) + } + } + + test("create a temp view on a persistent view") { + withView("jtv1", "temp_jtv1") { + sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3") + sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jtv1 WHERE id < 6") + checkAnswer(sql("select count(*) FROM temp_jtv1"), Row(2)) + } + } - // Checks temporary views + test("create a temp view on a temp view") { + withView("temp_jtv1", "temp_jtv2") { sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3") sql("CREATE TEMPORARY VIEW temp_jtv2 AS SELECT * FROM temp_jtv1 WHERE id < 6") checkAnswer(sql("select count(*) FROM temp_jtv2"), Row(2)) + } + } - // Checks interleaved temporary view and normal view - sql("CREATE TEMPORARY VIEW temp_jtv3 AS SELECT * FROM jt WHERE id > 3") - sql("CREATE VIEW jtv3 AS SELECT * FROM temp_jtv3 WHERE id < 6") - checkAnswer(sql("select count(*) FROM jtv3"), Row(2)) + test("create a persistent view on a temp view") { + withView("jtv1", "temp_jtv1") { + sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3") + val e = intercept[AnalysisException] { + sql("CREATE VIEW jtv1 AS SELECT * FROM temp_jtv1 WHERE id < 6") + }.getMessage + assert(e.contains( + s"Not allowed to create a permanent view `jtv1` by referencing a temp view `temp_jtv1`")) } } From 1b430bb366d1bd2e567f87c9b640e7c88ffbb144 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 3 Nov 2016 21:25:18 -0700 Subject: [PATCH 2/9] fix2 --- .../sql/catalyst/catalog/SessionCatalog.scala | 18 ++++++ .../spark/sql/execution/command/views.scala | 10 +++ .../sql/hive/execution/SQLViewSuite.scala | 61 +++++++++++++++++++ 3 files changed, 89 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index d3f08e8ab9990..6eab2f533db7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -923,6 +923,24 @@ class SessionCatalog( } } + /** + * Returns whether it is a temporary function. + */ + def isTempFunction(name: FunctionIdentifier): Boolean = { + // copied from HiveSessionCatalog + val hiveFunctions = Seq( + "hash", + "histogram_numeric", + "percentile") + + // A temporary function is a function that has been registered in functionRegistry + // without a database name, and is neither a built-in function nor a Hive function + name.database.isEmpty && + functionRegistry.functionExists(name.funcName) && + !FunctionRegistry.builtin.functionExists(name.funcName) && + !hiveFunctions.contains(name.funcName.toLowerCase) + } + protected def failFunctionLookup(name: String): Nothing = { throw new NoSuchFunctionException(db = currentDb, func = name) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index d3dc1f023ef5f..fbc53b1d18471 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -21,6 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.QueryPlan @@ -140,6 +141,15 @@ case class CreateViewCommand( s"referencing a temp view `${s.alias}`. " + originalText.map(sql => s"""SQL: "$sql".""").getOrElse("")) } + + child.collect { + case other if !other.resolved => other.expressions.flatMap(_.collect { + case e: UnresolvedFunction if sparkSession.sessionState.catalog.isTempFunction(e.name) => + throw new AnalysisException(s"Not allowed to create a permanent view $name by " + + s"referencing a temp function `${e.name}`. " + + originalText.map(sql => s"""SQL: "$sql".""").getOrElse("")) + }) + } } val aliasedPlan = if (userSpecifiedColumns.isEmpty) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index 0192daf63ddad..1780a06c5b2c8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -479,4 +479,65 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + test("create a persistent/temp view using a hive function") { + withView("view1", "tempView1") { + sql(s"CREATE VIEW tempView1 AS SELECT histogram_numeric(id, 5) from jt") + checkAnswer(sql("select count(*) FROM tempView1"), Row(1)) + sql(s"CREATE VIEW view1 AS SELECT histogram_numeric(id, 5) from jt") + checkAnswer(sql("select count(*) FROM view1"), Row(1)) + } + } + + test("create a persistent/temp view using a built-in function") { + withView("view1", "tempView1") { + sql(s"CREATE TEMPORARY VIEW tempView1 AS SELECT abs(id) from jt") + checkAnswer(sql("select count(*) FROM tempView1"), Row(9)) + sql(s"CREATE VIEW view1 AS SELECT abs(id) from jt") + checkAnswer(sql("select count(*) FROM view1"), Row(9)) + } + } + + test("create a persistent/temp view using a persistent function") { + val functionName = "myUpper" + val functionClass = + classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName + withUserDefinedFunction(functionName -> false) { + sql(s"CREATE FUNCTION $functionName AS '$functionClass'") + withView("view1", "tempView1") { + withTable("tab1") { + (1 to 10).map(i => s"$i").toDF("id").write.saveAsTable("tab1") + sql(s"CREATE TEMPORARY VIEW tempView1 AS SELECT $functionName(id) from tab1") + checkAnswer(sql("select count(*) FROM tempView1"), Row(10)) + sql(s"CREATE VIEW view1 AS SELECT $functionName(id) from tab1") + checkAnswer(sql("select count(*) FROM view1"), Row(10)) + } + } + } + } + + test("create a persistent/temp view using a temporary function") { + val tempFunctionName = "temp" + val functionClass = + classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName + withUserDefinedFunction(tempFunctionName -> true) { + sql(s"CREATE TEMPORARY FUNCTION $tempFunctionName AS '$functionClass'") + withView("view1", "tempView1") { + withTable("tab1") { + (1 to 10).map(i => s"$i").toDF("id").write.saveAsTable("tab1") + + // temporary view + sql(s"CREATE TEMPORARY VIEW tempView1 AS SELECT $tempFunctionName(id) from tab1") + checkAnswer(sql("select count(*) FROM tempView1"), Row(10)) + + // persistent view + val e = intercept[AnalysisException] { + sql(s"CREATE VIEW view1 AS SELECT $tempFunctionName(id) from tab1") + }.getMessage + assert(e.contains("Not allowed to create a permanent view `view1` by referencing " + + s"a temp function `$tempFunctionName`")) + } + } + } + } } From 695110f35df363286aad015ca591289e171ed185 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 3 Nov 2016 21:28:00 -0700 Subject: [PATCH 3/9] update code comments --- .../org/apache/spark/sql/execution/command/views.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index fbc53b1d18471..040577e57c37c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -131,10 +131,9 @@ case class CreateViewCommand( s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).") } - // When creating a permanent view, not allowed to reference temporary objects. For example, - // temporary views. - // TODO: Disallow creating permanent views based on temporary UDFs + // When creating a permanent view, not allowed to reference temporary objects. if (!isTemporary) { + // Disallow creating permanent views based on temporary views. analyzedPlan.collectFirst { case s: SubqueryAlias if s.isGeneratedByTempTable => throw new AnalysisException(s"Not allowed to create a permanent view $name by " + @@ -142,6 +141,7 @@ case class CreateViewCommand( originalText.map(sql => s"""SQL: "$sql".""").getOrElse("")) } + // Disallow creating permanent views based on temporary UDFs. child.collect { case other if !other.resolved => other.expressions.flatMap(_.collect { case e: UnresolvedFunction if sparkSession.sessionState.catalog.isTempFunction(e.name) => From 4dbd3b6f328ec4d686331c31bba340192ddf91df Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 3 Nov 2016 23:12:55 -0700 Subject: [PATCH 4/9] address comments --- .../sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/catalog/SessionCatalog.scala | 8 +++--- .../spark/sql/catalyst/dsl/package.scala | 4 +-- .../sql/catalyst/optimizer/subquery.scala | 2 +- .../sql/catalyst/parser/AstBuilder.scala | 4 +-- .../plans/logical/basicLogicalOperators.scala | 5 +--- .../sql/catalyst/analysis/AnalysisSuite.scala | 4 +-- .../catalog/SessionCatalogSuite.scala | 12 ++++---- .../optimizer/ColumnPruningSuite.scala | 8 +++--- .../EliminateSubqueryAliasesSuite.scala | 6 ++-- .../optimizer/JoinOptimizationSuite.scala | 8 +++--- .../sql/catalyst/parser/PlanParserSuite.scala | 2 +- .../scala/org/apache/spark/sql/Dataset.scala | 2 +- .../spark/sql/catalyst/SQLBuilder.scala | 4 +-- .../spark/sql/execution/command/views.scala | 20 ++++++------- .../sql/execution/datasources/rules.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 12 ++++---- .../spark/sql/hive/HiveSessionCatalog.scala | 7 +++-- .../sql/hive/execution/SQLViewSuite.scala | 28 ++++++++++++------- 19 files changed, 72 insertions(+), 68 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 91883381e1d55..5011f2fdbf9b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -134,7 +134,7 @@ class Analyzer( case u : UnresolvedRelation => val substituted = cteRelations.find(x => resolver(x._1, u.tableIdentifier.table)) .map(_._2).map { relation => - val withAlias = u.alias.map(SubqueryAlias(_, relation, None)()) + val withAlias = u.alias.map(SubqueryAlias(_, relation, None)) withAlias.getOrElse(relation) } substituted.getOrElse(u) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 6eab2f533db7d..f214fa7c16d0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -553,16 +553,16 @@ class SessionCatalog( val relationAlias = alias.getOrElse(table) if (db == globalTempViewManager.database) { globalTempViewManager.get(table).map { viewDef => - SubqueryAlias(relationAlias, viewDef, Some(name))(isGeneratedByTempTable = true) + SubqueryAlias(relationAlias, viewDef, Some(name)) }.getOrElse(throw new NoSuchTableException(db, table)) } else if (name.database.isDefined || !tempTables.contains(table)) { val metadata = externalCatalog.getTable(db, table) val view = Option(metadata.tableType).collect { case CatalogTableType.VIEW => name } - SubqueryAlias(relationAlias, SimpleCatalogRelation(db, metadata), view)() + SubqueryAlias(relationAlias, SimpleCatalogRelation(db, metadata), view) } else { - SubqueryAlias(relationAlias, tempTables(table), Option(name))(isGeneratedByTempTable = true) + SubqueryAlias(relationAlias, tempTables(table), Option(name)) } } } @@ -926,7 +926,7 @@ class SessionCatalog( /** * Returns whether it is a temporary function. */ - def isTempFunction(name: FunctionIdentifier): Boolean = { + def isTemporaryFunction(name: FunctionIdentifier): Boolean = { // copied from HiveSessionCatalog val hiveFunctions = Seq( "hash", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 0be6fcb81f6da..e901683be6854 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -347,7 +347,7 @@ package object dsl { orderSpec: Seq[SortOrder]): LogicalPlan = Window(windowExpressions, partitionSpec, orderSpec, logicalPlan) - def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan, None)() + def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan, None) def except(otherPlan: LogicalPlan): LogicalPlan = Except(logicalPlan, otherPlan) @@ -371,7 +371,7 @@ package object dsl { def as(alias: String): LogicalPlan = logicalPlan match { case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias)) - case plan => SubqueryAlias(alias, plan, None)() + case plan => SubqueryAlias(alias, plan, None) } def repartition(num: Integer): LogicalPlan = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index f59b4e395d991..f14aaab72a98f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -293,7 +293,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] { case Project(projList, _) => subqueryRoot = Project(projList ++ havingInputs, subqueryRoot) case s @ SubqueryAlias(alias, _, None) => - subqueryRoot = SubqueryAlias(alias, subqueryRoot, None)() + subqueryRoot = SubqueryAlias(alias, subqueryRoot, None) case op => sys.error(s"Unexpected operator $op in corelated subquery") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index fbf75582ee79a..4b151c81d8f8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -108,7 +108,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { * This is only used for Common Table Expressions. */ override def visitNamedQuery(ctx: NamedQueryContext): SubqueryAlias = withOrigin(ctx) { - SubqueryAlias(ctx.name.getText, plan(ctx.query), None)() + SubqueryAlias(ctx.name.getText, plan(ctx.query), None) } /** @@ -729,7 +729,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { * Create an alias (SubqueryAlias) for a LogicalPlan. */ private def aliasPlan(alias: ParserRuleContext, plan: LogicalPlan): LogicalPlan = { - SubqueryAlias(alias.getText, plan, None)() + SubqueryAlias(alias.getText, plan, None) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 96e6cee3ed770..65ceab2ce27b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -709,13 +709,10 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo case class SubqueryAlias( alias: String, child: LogicalPlan, - view: Option[TableIdentifier])( - val isGeneratedByTempTable: java.lang.Boolean = false) + view: Option[TableIdentifier]) extends UnaryNode { override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias))) - - override protected def otherCopyArgs: Seq[AnyRef] = isGeneratedByTempTable :: Nil } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 45372fbb29b14..817de48de2798 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -369,8 +369,8 @@ class AnalysisSuite extends AnalysisTest with ShouldMatchers { val query = Project(Seq($"x.key", $"y.key"), Join( - Project(Seq($"x.key"), SubqueryAlias("x", input, None)()), - Project(Seq($"y.key"), SubqueryAlias("y", input, None)()), + Project(Seq($"x.key"), SubqueryAlias("x", input, None)), + Project(Seq($"y.key"), SubqueryAlias("y", input, None)), Cross, None)) assertAnalysisSuccess(query) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index e3198c75bc3e8..b77fef225a0c8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -406,24 +406,24 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.setCurrentDatabase("db2") // If we explicitly specify the database, we'll look up the relation in that database assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))) - == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None)()) + == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None)) // Otherwise, we'll first look up a temporary table with the same name assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) - == SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1")))()) + == SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1")))) // Then, if that does not exist, look up the relation in the current database sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) - == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None)()) + == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None)) } test("lookup table relation with alias") { val catalog = new SessionCatalog(newBasicCatalog()) val alias = "monster" val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2"))) - val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata), None)() + val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata), None) val relationWithAlias = SubqueryAlias(alias, - SimpleCatalogRelation("db2", tableMetadata), None)() + SimpleCatalogRelation("db2", tableMetadata), None) assert(catalog.lookupRelation( TableIdentifier("tbl1", Some("db2")), alias = None) == relation) assert(catalog.lookupRelation( @@ -435,7 +435,7 @@ class SessionCatalogSuite extends SparkFunSuite { val tmpView = Range(1, 10, 2, 10) catalog.createTempView("vw1", tmpView, overrideIfExists = false) val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range")) - assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1")))()) + assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1")))) } test("table exists") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 854e759afa4ef..5bd1bc80c3b8a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -320,16 +320,16 @@ class ColumnPruningSuite extends PlanTest { val query = Project(Seq($"x.key", $"y.key"), Join( - SubqueryAlias("x", input, None)(), - BroadcastHint(SubqueryAlias("y", input, None)()), Inner, None)).analyze + SubqueryAlias("x", input, None), + BroadcastHint(SubqueryAlias("y", input, None)), Inner, None)).analyze val optimized = Optimize.execute(query) val expected = Join( - Project(Seq($"x.key"), SubqueryAlias("x", input, None)()), + Project(Seq($"x.key"), SubqueryAlias("x", input, None)), BroadcastHint( - Project(Seq($"y.key"), SubqueryAlias("y", input, None)())), + Project(Seq($"y.key"), SubqueryAlias("y", input, None))), Inner, None).analyze comparePlans(optimized, expected) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala index 35efaf8465698..a8aeedbd62759 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala @@ -46,13 +46,13 @@ class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper { test("eliminate top level subquery") { val input = LocalRelation('a.int, 'b.int) - val query = SubqueryAlias("a", input, None)() + val query = SubqueryAlias("a", input, None) comparePlans(afterOptimization(query), input) } test("eliminate mid-tree subquery") { val input = LocalRelation('a.int, 'b.int) - val query = Filter(TrueLiteral, SubqueryAlias("a", input, None)()) + val query = Filter(TrueLiteral, SubqueryAlias("a", input, None)) comparePlans( afterOptimization(query), Filter(TrueLiteral, LocalRelation('a.int, 'b.int))) @@ -61,7 +61,7 @@ class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper { test("eliminate multiple subqueries") { val input = LocalRelation('a.int, 'b.int) val query = Filter(TrueLiteral, - SubqueryAlias("c", SubqueryAlias("b", SubqueryAlias("a", input, None)(), None)(), None)()) + SubqueryAlias("c", SubqueryAlias("b", SubqueryAlias("a", input, None), None), None)) comparePlans( afterOptimization(query), Filter(TrueLiteral, LocalRelation('a.int, 'b.int))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index 18c4da9595521..087718b3ecf1a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -129,15 +129,15 @@ class JoinOptimizationSuite extends PlanTest { val query = Project(Seq($"x.key", $"y.key"), Join( - SubqueryAlias("x", input, None)(), - BroadcastHint(SubqueryAlias("y", input, None)()), Cross, None)).analyze + SubqueryAlias("x", input, None), + BroadcastHint(SubqueryAlias("y", input, None)), Cross, None)).analyze val optimized = Optimize.execute(query) val expected = Join( - Project(Seq($"x.key"), SubqueryAlias("x", input, None)()), - BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input, None)())), + Project(Seq($"x.key"), SubqueryAlias("x", input, None)), + BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input, None))), Cross, None).analyze comparePlans(optimized, expected) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 2ab71151c9397..7400f3430e99c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -79,7 +79,7 @@ class PlanParserSuite extends PlanTest { def cte(plan: LogicalPlan, namedPlans: (String, LogicalPlan)*): With = { val ctes = namedPlans.map { case (name, cte) => - name -> SubqueryAlias(name, cte, None)() + name -> SubqueryAlias(name, cte, None) } With(plan, ctes) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6d3464a8ac908..eb2b20afc37cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1039,7 +1039,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def as(alias: String): Dataset[T] = withTypedPlan { - SubqueryAlias(alias, logicalPlan, None)() + SubqueryAlias(alias, logicalPlan, None) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala index 4c562e277ea65..6f821f80cc4c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala @@ -75,7 +75,7 @@ class SQLBuilder private ( val aliasedOutput = canonicalizedPlan.output.zip(outputNames).map { case (attr, name) => Alias(attr.withQualifier(None), name)() } - val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan, None)()) + val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan, None)) try { val replaced = finalPlan.transformAllExpressions { @@ -563,7 +563,7 @@ class SQLBuilder private ( } private def addSubquery(plan: LogicalPlan): SubqueryAlias = { - SubqueryAlias(newSubqueryName(), plan, None)() + SubqueryAlias(newSubqueryName(), plan, None) } private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = plan match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 040577e57c37c..e20a6f8e66d23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -18,10 +18,9 @@ package org.apache.spark.sql.execution.command import scala.util.control.NonFatal - import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction +import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.QueryPlan @@ -133,18 +132,17 @@ case class CreateViewCommand( // When creating a permanent view, not allowed to reference temporary objects. if (!isTemporary) { - // Disallow creating permanent views based on temporary views. - analyzedPlan.collectFirst { - case s: SubqueryAlias if s.isGeneratedByTempTable => + child.collect { + // Disallow creating permanent views based on temporary views. + case s: UnresolvedRelation + if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) => throw new AnalysisException(s"Not allowed to create a permanent view $name by " + - s"referencing a temp view `${s.alias}`. " + + s"referencing a temp view ${s.tableIdentifier}. " + originalText.map(sql => s"""SQL: "$sql".""").getOrElse("")) - } - - // Disallow creating permanent views based on temporary UDFs. - child.collect { case other if !other.resolved => other.expressions.flatMap(_.collect { - case e: UnresolvedFunction if sparkSession.sessionState.catalog.isTempFunction(e.name) => + // Disallow creating permanent views based on temporary UDFs. + case e: UnresolvedFunction + if sparkSession.sessionState.catalog.isTemporaryFunction(e.name) => throw new AnalysisException(s"Not allowed to create a permanent view $name by " + s"referencing a temp function `${e.name}`. " + originalText.map(sql => s"""SQL: "$sql".""").getOrElse("")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 841c9dbfc8fa1..4647b11af4dfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -55,7 +55,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { s"${u.tableIdentifier.database.get}") } val plan = LogicalRelation(dataSource.resolveRelation()) - u.alias.map(a => SubqueryAlias(u.alias.get, plan, None)()).getOrElse(plan) + u.alias.map(a => SubqueryAlias(u.alias.get, plan, None)).getOrElse(plan) } catch { case e: ClassNotFoundException => u case e: Exception => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8eee914243c8e..8e5fc88aad448 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -106,21 +106,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log if (DDLUtils.isDatasourceTable(table)) { val dataSourceTable = cachedDataSourceTables(qualifiedTableName) - val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None)() + val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None) // Then, if alias is specified, wrap the table with a Subquery using the alias. // Otherwise, wrap the table with a Subquery using the table name. - alias.map(a => SubqueryAlias(a, qualifiedTable, None)()).getOrElse(qualifiedTable) + alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable) } else if (table.tableType == CatalogTableType.VIEW) { val viewText = table.viewText.getOrElse(sys.error("Invalid view without text.")) SubqueryAlias( alias.getOrElse(table.identifier.table), sparkSession.sessionState.sqlParser.parsePlan(viewText), - Option(table.identifier))() + Option(table.identifier)) } else { val qualifiedTable = MetastoreRelation( qualifiedTableName.database, qualifiedTableName.name)(table, sparkSession) - alias.map(a => SubqueryAlias(a, qualifiedTable, None)()).getOrElse(qualifiedTable) + alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable) } } @@ -312,7 +312,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // Read path case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => val parquetRelation = convertToParquetRelation(relation) - SubqueryAlias(relation.tableName, parquetRelation, None)() + SubqueryAlias(relation.tableName, parquetRelation, None) } } } @@ -350,7 +350,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // Read path case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => val orcRelation = convertToOrcRelation(relation) - SubqueryAlias(relation.tableName, orcRelation, None)() + SubqueryAlias(relation.tableName, orcRelation, None) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 4c056051aa41c..db42a7ac28598 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -61,7 +61,7 @@ private[sql] class HiveSessionCatalog( if (db == globalTempViewManager.database) { val relationAlias = alias.getOrElse(table) globalTempViewManager.get(table).map { viewDef => - SubqueryAlias(relationAlias, viewDef, Some(name))(isGeneratedByTempTable = true) + SubqueryAlias(relationAlias, viewDef, Some(name)) }.getOrElse(throw new NoSuchTableException(db, table)) } else if (name.database.isDefined || !tempTables.contains(table)) { val database = name.database.map(formatDatabaseName) @@ -69,10 +69,10 @@ private[sql] class HiveSessionCatalog( metastoreCatalog.lookupRelation(newName, alias) } else { val relation = tempTables(table) - val tableWithQualifiers = SubqueryAlias(table, relation, None)(isGeneratedByTempTable = true) + val tableWithQualifiers = SubqueryAlias(table, relation, None) // If an alias was specified by the lookup, wrap the plan in a subquery so that // attributes are properly qualified with this alias. - alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)()).getOrElse(tableWithQualifiers) + alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers) } } @@ -232,6 +232,7 @@ private[sql] class HiveSessionCatalog( // current_user, ewah_bitmap, ewah_bitmap_and, ewah_bitmap_empty, ewah_bitmap_or, field, // in_file, index, matchpath, ngrams, noop, noopstreaming, noopwithmap, // noopwithmapstreaming, parse_url_tuple, reflect2, windowingtablefunction. + // Note: don't forget to update SessionCatalog.isTemporaryFunction private val hiveFunctions = Seq( "hash", "histogram_numeric", diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index 1780a06c5b2c8..a2e5f2c2fa31f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -38,7 +38,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { spark.sql(s"DROP TABLE IF EXISTS jt") } - test("create a persistent view on a persistent view") { + test("create a permanent view on a permanent view") { withView("jtv1", "jtv2") { sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3") sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6") @@ -46,7 +46,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("create a temp view on a persistent view") { + test("create a temp view on a permanent view") { withView("jtv1", "temp_jtv1") { sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3") sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jtv1 WHERE id < 6") @@ -62,14 +62,22 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("create a persistent view on a temp view") { - withView("jtv1", "temp_jtv1") { + test("create a permanent view on a temp view") { + withView("jtv1", "temp_jtv1", "global_temp_jtv1") { sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3") - val e = intercept[AnalysisException] { + var e = intercept[AnalysisException] { sql("CREATE VIEW jtv1 AS SELECT * FROM temp_jtv1 WHERE id < 6") }.getMessage assert(e.contains( s"Not allowed to create a permanent view `jtv1` by referencing a temp view `temp_jtv1`")) + + val globalTempDB = spark.sharedState.globalTempViewManager.database + sql("CREATE GLOBAL TEMP VIEW global_temp_jtv1 AS SELECT * FROM jt WHERE id > 0") + e = intercept[AnalysisException] { + sql(s"CREATE VIEW jtv1 AS SELECT * FROM $globalTempDB.global_temp_jtv1 WHERE id < 6") + }.getMessage + assert(e.contains(s"Not allowed to create a permanent view `jtv1` by referencing " + + s"a temp view `global_temp`.`global_temp_jtv1`")) } } @@ -480,7 +488,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("create a persistent/temp view using a hive function") { + test("create a permanent/temp view using a hive function") { withView("view1", "tempView1") { sql(s"CREATE VIEW tempView1 AS SELECT histogram_numeric(id, 5) from jt") checkAnswer(sql("select count(*) FROM tempView1"), Row(1)) @@ -489,7 +497,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("create a persistent/temp view using a built-in function") { + test("create a permanent/temp view using a built-in function") { withView("view1", "tempView1") { sql(s"CREATE TEMPORARY VIEW tempView1 AS SELECT abs(id) from jt") checkAnswer(sql("select count(*) FROM tempView1"), Row(9)) @@ -498,7 +506,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("create a persistent/temp view using a persistent function") { + test("create a permanent/temp view using a permanent function") { val functionName = "myUpper" val functionClass = classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName @@ -516,7 +524,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("create a persistent/temp view using a temporary function") { + test("create a permanent/temp view using a temporary function") { val tempFunctionName = "temp" val functionClass = classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName @@ -530,7 +538,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql(s"CREATE TEMPORARY VIEW tempView1 AS SELECT $tempFunctionName(id) from tab1") checkAnswer(sql("select count(*) FROM tempView1"), Row(10)) - // persistent view + // permanent view val e = intercept[AnalysisException] { sql(s"CREATE VIEW view1 AS SELECT $tempFunctionName(id) from tab1") }.getMessage From 7100a8fcb3975d5f00fcc9e485f3606873fd1a0d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 3 Nov 2016 23:13:44 -0700 Subject: [PATCH 5/9] fix style --- .../scala/org/apache/spark/sql/execution/command/views.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index e20a6f8e66d23..717834b4b3a95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -18,13 +18,14 @@ package org.apache.spark.sql.execution.command import scala.util.control.NonFatal + import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.types.MetadataBuilder From 86e7f9d8afdaed72666b79692a31a9131461dc29 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 4 Nov 2016 00:19:49 -0700 Subject: [PATCH 6/9] resolve comments. --- .../spark/sql/execution/command/views.scala | 48 ++++++++++++------- .../sql/hive/execution/SQLViewSuite.scala | 12 ++--- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 717834b4b3a95..30472ec45ce44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -132,24 +132,8 @@ case class CreateViewCommand( } // When creating a permanent view, not allowed to reference temporary objects. - if (!isTemporary) { - child.collect { - // Disallow creating permanent views based on temporary views. - case s: UnresolvedRelation - if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) => - throw new AnalysisException(s"Not allowed to create a permanent view $name by " + - s"referencing a temp view ${s.tableIdentifier}. " + - originalText.map(sql => s"""SQL: "$sql".""").getOrElse("")) - case other if !other.resolved => other.expressions.flatMap(_.collect { - // Disallow creating permanent views based on temporary UDFs. - case e: UnresolvedFunction - if sparkSession.sessionState.catalog.isTemporaryFunction(e.name) => - throw new AnalysisException(s"Not allowed to create a permanent view $name by " + - s"referencing a temp function `${e.name}`. " + - originalText.map(sql => s"""SQL: "$sql".""").getOrElse("")) - }) - } - } + // This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved) + verifyTemporaryObjectsNotExists(sparkSession) val aliasedPlan = if (userSpecifiedColumns.isEmpty) { analyzedPlan @@ -192,6 +176,34 @@ case class CreateViewCommand( Seq.empty[Row] } + /** + * Permanent views are not allowed to reference temp objects, including temp function and views + */ + private def verifyTemporaryObjectsNotExists(sparkSession: SparkSession): Unit = { + if (!isTemporary) { + // This func traverses the unresolved plan `child`. Below are the reasons: + // 1) Analyzer replaces unresolved temporary views by a SubqueryAlias with the corresponding + // logical plan. After replacement, it is impossible to detect whether the SubqueryAlias is + // added/generated from a temporary view. + // 2) The temp functions are represented by multiple classes. Most are inaccessible from this + // package (e.g., HiveGenericUDF). + child.collect { + // Disallow creating permanent views based on temporary views. + case s: UnresolvedRelation + if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) => + throw new AnalysisException(s"Not allowed to create a permanent view $name by " + + s"referencing a temporary view ${s.tableIdentifier}") + case other if !other.resolved => other.expressions.flatMap(_.collect { + // Disallow creating permanent views based on temporary UDFs. + case e: UnresolvedFunction + if sparkSession.sessionState.catalog.isTemporaryFunction(e.name) => + throw new AnalysisException(s"Not allowed to create a permanent view $name by " + + s"referencing a temporary function `${e.name}`") + }) + } + } + } + /** * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize * SQL based on the analyzed plan, and also creates the proper schema for the view. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index a2e5f2c2fa31f..4fbc2fb638c8e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -68,8 +68,8 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { var e = intercept[AnalysisException] { sql("CREATE VIEW jtv1 AS SELECT * FROM temp_jtv1 WHERE id < 6") }.getMessage - assert(e.contains( - s"Not allowed to create a permanent view `jtv1` by referencing a temp view `temp_jtv1`")) + assert(e.contains("Not allowed to create a permanent view `jtv1` by " + + "referencing a temporary view `temp_jtv1`")) val globalTempDB = spark.sharedState.globalTempViewManager.database sql("CREATE GLOBAL TEMP VIEW global_temp_jtv1 AS SELECT * FROM jt WHERE id > 0") @@ -77,7 +77,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql(s"CREATE VIEW jtv1 AS SELECT * FROM $globalTempDB.global_temp_jtv1 WHERE id < 6") }.getMessage assert(e.contains(s"Not allowed to create a permanent view `jtv1` by referencing " + - s"a temp view `global_temp`.`global_temp_jtv1`")) + s"a temporary view `global_temp`.`global_temp_jtv1`")) } } @@ -464,7 +464,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("SPARK-14933 - create view from hive parquet tabale") { + test("SPARK-14933 - create view from hive parquet table") { withTable("t_part") { withView("v_part") { spark.sql("create table t_part stored as parquet as select 1 as a, 2 as b") @@ -476,7 +476,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("SPARK-14933 - create view from hive orc tabale") { + test("SPARK-14933 - create view from hive orc table") { withTable("t_orc") { withView("v_orc") { spark.sql("create table t_orc stored as orc as select 1 as a, 2 as b") @@ -543,7 +543,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql(s"CREATE VIEW view1 AS SELECT $tempFunctionName(id) from tab1") }.getMessage assert(e.contains("Not allowed to create a permanent view `view1` by referencing " + - s"a temp function `$tempFunctionName`")) + s"a temporary function `$tempFunctionName`")) } } } From a4df82b255089e9daf4167846186f25879a60da3 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 4 Nov 2016 09:47:10 -0700 Subject: [PATCH 7/9] resolve comments. --- .../catalog/SessionCatalogSuite.scala | 28 ++++++++++ .../sql/hive/execution/SQLViewSuite.scala | 53 ++++++++----------- 2 files changed, 51 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index b77fef225a0c8..001d9c47785d2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -919,6 +919,34 @@ class SessionCatalogSuite extends SparkFunSuite { catalog.lookupFunction(FunctionIdentifier("temp1"), arguments) === Literal(arguments.length)) } + test("isTemporaryFunction") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + + // Returns false when the function does not exist + assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("temp1"))) + + val tempFunc1 = (e: Seq[Expression]) => e.head + val info1 = new ExpressionInfo("tempFunc1", "temp1") + sessionCatalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false) + + // Returns true when the function is temporary + assert(sessionCatalog.isTemporaryFunction(FunctionIdentifier("temp1"))) + + // Returns false when the function is permanent + assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1")) + assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("func1", Some("db2")))) + assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("db2.func1"))) + sessionCatalog.setCurrentDatabase("db2") + assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("func1"))) + + // Returns false when the function is built-in or hive + assert(FunctionRegistry.builtin.functionExists("sum")) + assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("sum"))) + assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("histogram_numeric"))) + assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("percentile"))) + } + test("drop function") { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index 4fbc2fb638c8e..0da61566af5c0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -488,37 +488,30 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("create a permanent/temp view using a hive function") { - withView("view1", "tempView1") { - sql(s"CREATE VIEW tempView1 AS SELECT histogram_numeric(id, 5) from jt") - checkAnswer(sql("select count(*) FROM tempView1"), Row(1)) - sql(s"CREATE VIEW view1 AS SELECT histogram_numeric(id, 5) from jt") - checkAnswer(sql("select count(*) FROM view1"), Row(1)) - } - } - - test("create a permanent/temp view using a built-in function") { - withView("view1", "tempView1") { - sql(s"CREATE TEMPORARY VIEW tempView1 AS SELECT abs(id) from jt") - checkAnswer(sql("select count(*) FROM tempView1"), Row(9)) - sql(s"CREATE VIEW view1 AS SELECT abs(id) from jt") - checkAnswer(sql("select count(*) FROM view1"), Row(9)) - } - } - - test("create a permanent/temp view using a permanent function") { - val functionName = "myUpper" - val functionClass = + test("create a permanent/temp view using a hive, built-in, and permanent user function") { + val permanentFuncName = "myUpper" + val permanentFuncClass = classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName - withUserDefinedFunction(functionName -> false) { - sql(s"CREATE FUNCTION $functionName AS '$functionClass'") - withView("view1", "tempView1") { - withTable("tab1") { - (1 to 10).map(i => s"$i").toDF("id").write.saveAsTable("tab1") - sql(s"CREATE TEMPORARY VIEW tempView1 AS SELECT $functionName(id) from tab1") - checkAnswer(sql("select count(*) FROM tempView1"), Row(10)) - sql(s"CREATE VIEW view1 AS SELECT $functionName(id) from tab1") - checkAnswer(sql("select count(*) FROM view1"), Row(10)) + val builtInFuncName = "abs" + val hiveFuncName = "histogram_numeric" + + withUserDefinedFunction(permanentFuncName -> false) { + sql(s"CREATE FUNCTION $permanentFuncName AS '$permanentFuncClass'") + withTable("tab1") { + (1 to 10).map(i => (s"$i", i)).toDF("str", "id").write.saveAsTable("tab1") + Seq("VIEW", "TEMPORARY VIEW").foreach { viewMode => + withView("view1") { + sql( + s""" + |CREATE $viewMode view1 + |AS SELECT + |$permanentFuncName(str), + |$builtInFuncName(id), + |$hiveFuncName(id, 5) over() + |FROM tab1 + """.stripMargin) + checkAnswer(sql("select count(*) FROM view1"), Row(10)) + } } } } From 1c3899f8092d2f568391423df27cac77ee58ffca Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 4 Nov 2016 14:18:46 -0700 Subject: [PATCH 8/9] add comments. --- .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index f214fa7c16d0f..c8b61d8df3585 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -924,7 +924,7 @@ class SessionCatalog( } /** - * Returns whether it is a temporary function. + * Returns whether it is a temporary function. If not existed, returns false. */ def isTemporaryFunction(name: FunctionIdentifier): Boolean = { // copied from HiveSessionCatalog From fec0066331aaf085f8cd0318be23844560e72b4b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 6 Nov 2016 09:33:20 -0800 Subject: [PATCH 9/9] address comments --- .../org/apache/spark/sql/hive/execution/SQLViewSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index 0da61566af5c0..ba65db71ede7f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -492,7 +492,8 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { val permanentFuncName = "myUpper" val permanentFuncClass = classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName - val builtInFuncName = "abs" + val builtInFuncNameInLowerCase = "abs" + val builtInFuncNameInMixedCase = "aBs" val hiveFuncName = "histogram_numeric" withUserDefinedFunction(permanentFuncName -> false) { @@ -506,7 +507,8 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |CREATE $viewMode view1 |AS SELECT |$permanentFuncName(str), - |$builtInFuncName(id), + |$builtInFuncNameInLowerCase(id), + |$builtInFuncNameInMixedCase(id) as aBs, |$hiveFuncName(id, 5) over() |FROM tab1 """.stripMargin)