diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6da99ce0dd683..c9cf5bbcdfe7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -42,8 +42,8 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution} -import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand} -import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} +import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, TemporaryView} +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery} @@ -2429,7 +2429,7 @@ class Dataset[T] private[sql]( */ @throws[AnalysisException] def createTempView(viewName: String): Unit = withPlan { - createViewCommand(viewName, replace = false) + createTempViewCommand(viewName, replace = false) } /** @@ -2440,10 +2440,10 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ def createOrReplaceTempView(viewName: String): Unit = withPlan { - createViewCommand(viewName, replace = true) + createTempViewCommand(viewName, replace = true) } - private def createViewCommand(viewName: String, replace: Boolean): CreateViewCommand = { + private def createTempViewCommand(viewName: String, replace: Boolean): CreateViewCommand = { CreateViewCommand( name = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName), userSpecifiedColumns = Nil, @@ -2451,9 +2451,8 @@ class Dataset[T] private[sql]( properties = Map.empty, originalText = None, child = logicalPlan, - allowExisting = false, - replace = replace, - isTemporary = true) + if (replace) SaveMode.Overwrite else SaveMode.ErrorIfExists, + TemporaryView) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index e32d30178eeb1..c3c0628a13f50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1254,6 +1254,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ic.identifier.getText -> Option(ic.STRING).map(string) } } + val viewType = if (ctx.TEMPORARY != null) TemporaryView else PermanentView createView( ctx, ctx.tableIdentifier, @@ -1261,15 +1262,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { userSpecifiedColumns, ctx.query, Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty), - allowExisting = ctx.EXISTS != null, + ignoreIfExists = ctx.EXISTS != null, replace = ctx.REPLACE != null, - isTemporary = ctx.TEMPORARY != null + viewType ) } } /** - * Alter the query of a view. This creates a [[CreateViewCommand]] command. + * Alter the query of a view. This creates a [[CreateViewCommand]] command. The view type could be + * either temporary or permanent. */ override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) { createView( @@ -1279,9 +1281,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { userSpecifiedColumns = Seq.empty, query = ctx.query, properties = Map.empty, - allowExisting = false, + ignoreIfExists = false, replace = true, - isTemporary = false) + AnyTypeView + ) } /** @@ -1294,10 +1297,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { userSpecifiedColumns: Seq[(String, Option[String])], query: QueryContext, properties: Map[String, String], - allowExisting: Boolean, + ignoreIfExists: Boolean, replace: Boolean, - isTemporary: Boolean): LogicalPlan = { + viewType: ViewType): LogicalPlan = { + val mode = (replace, ignoreIfExists) match { + case (true, false) => SaveMode.Overwrite + case (false, true) => SaveMode.Ignore + case (false, false) => SaveMode.ErrorIfExists + case _ => throw new ParseException( + "CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.", ctx) + } val originalText = source(query) + CreateViewCommand( visitTableIdentifier(name), userSpecifiedColumns, @@ -1305,9 +1316,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { properties, Some(originalText), plan(query), - allowExisting = allowExisting, - replace = replace, - isTemporary = isTemporary) + mode, + viewType) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index cda3b2b75e6b4..16d81ead9cdec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -443,8 +443,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { query) ExecutedCommandExec(cmd) :: Nil - case c: CreateTempViewUsing => ExecutedCommandExec(c) :: Nil - case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index f0d7b64c3c160..da0868dbf9901 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import scala.util.control.NonFatal -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} @@ -41,14 +41,12 @@ import org.apache.spark.sql.types.StructType * Dataset API. * @param child the logical plan that represents the view; this is used to generate a canonicalized * version of the SQL that can be saved in the catalog. - * @param allowExisting if true, and if the view already exists, noop; if false, and if the view - * already exists, throws analysis exception. - * @param replace if true, and if the view already exists, updates it; if false, and if the view - * already exists, throws analysis exception. - * @param isTemporary if true, the view is created as a temporary view. Temporary views are dropped - * at the end of current Spark session. Existing permanent relations with the same - * name are not visible to the current session while the temporary view exists, - * unless they are specified with full qualified table name with database prefix. + * @param mode only three modes are supported here: Ignore, Overwrite and ErrorIfExists. If the + * view already exists, CreateViewCommand behaves based on the mode: + * 1) Overwrite: update the view; + * 2) Ignore: noop; + * 3) ErrorIfExists throws analysis exception. + * @param viewType the type of this view. */ case class CreateViewCommand( name: TableIdentifier, @@ -57,9 +55,8 @@ case class CreateViewCommand( properties: Map[String, String], originalText: Option[String], child: LogicalPlan, - allowExisting: Boolean, - replace: Boolean, - isTemporary: Boolean) + mode: SaveMode, + viewType: ViewType) extends RunnableCommand { override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) @@ -69,23 +66,17 @@ case class CreateViewCommand( override def output: Seq[Attribute] = Seq.empty[Attribute] - if (!isTemporary) { - require(originalText.isDefined, - "The table to created with CREATE VIEW must have 'originalText'.") - } - - if (allowExisting && replace) { - throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.") - } + assert(mode != SaveMode.Append, + "CREATE or ALTER VIEW can only use ErrorIfExists, Ignore or Overwrite.") // Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE' - if (allowExisting && isTemporary) { + if (mode == SaveMode.Ignore && viewType == TemporaryView) { throw new AnalysisException( "It is not allowed to define a TEMPORARY view with IF NOT EXISTS.") } // Temporary view names should NOT contain database prefix like "database.table" - if (isTemporary && name.database.isDefined) { + if (viewType == TemporaryView && name.database.isDefined) { val database = name.database.get throw new AnalysisException( s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.") @@ -105,38 +96,40 @@ case class CreateViewCommand( } val sessionState = sparkSession.sessionState - if (isTemporary) { - createTemporaryView(sparkSession, analyzedPlan) - } else { - // Adds default database for permanent table if it doesn't exist, so that tableExists() - // only check permanent tables. - val database = name.database.getOrElse(sessionState.catalog.getCurrentDatabase) - val qualifiedName = name.copy(database = Option(database)) - - if (sessionState.catalog.tableExists(qualifiedName)) { - val tableMetadata = sessionState.catalog.getTableMetadata(qualifiedName) - if (allowExisting) { - // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view - // already exists. - } else if (tableMetadata.tableType != CatalogTableType.VIEW) { - throw new AnalysisException( - "Existing table is not a view. The following is an existing table, " + - s"not a view: $qualifiedName") - } else if (replace) { - // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` - sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan)) + viewType match { + case TemporaryView => + createTemporaryView(sparkSession, analyzedPlan) + case AnyTypeView if sessionState.catalog.isTemporaryTable(name) => + createTemporaryView(sparkSession, analyzedPlan) + case _ => + // Adds default database for permanent table if it doesn't exist, so that tableExists() + // only check permanent tables. + val database = name.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val qualifiedName = name.copy(database = Option(database)) + if (sessionState.catalog.tableExists(qualifiedName)) { + val tableMetadata = sessionState.catalog.getTableMetadata(qualifiedName) + if (mode == SaveMode.Ignore) { + // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target + // view already exists. + } else if (tableMetadata.tableType != CatalogTableType.VIEW) { + throw new AnalysisException( + "Existing table is not a view. The following is an existing table, " + + s"not a view: $qualifiedName") + } else if (mode == SaveMode.Overwrite) { + // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` + sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan)) + } else { + // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already + // exists. + throw new AnalysisException( + s"View $qualifiedName already exists. If you want to update the view definition, " + + "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS") + } } else { - // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already - // exists. - throw new AnalysisException( - s"View $qualifiedName already exists. If you want to update the view definition, " + - "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS") + // Create the view if it doesn't exist. + sessionState.catalog.createTable( + prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false) } - } else { - // Create the view if it doesn't exist. - sessionState.catalog.createTable( - prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false) - } } Seq.empty[Row] } @@ -154,7 +147,7 @@ case class CreateViewCommand( sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed } - catalog.createTempView(name.table, logicalPlan, replace) + catalog.createTempView(name.table, logicalPlan, mode == SaveMode.Overwrite) } /** @@ -202,3 +195,27 @@ case class CreateViewCommand( ) } } + +/** + * The trait used to represent the type of a view. + */ +sealed trait ViewType + +/** + * Temporary means local temporary views. The views are session-scoped and automatically dropped + * when the session terminates. Do not qualify a temporary table with a schema name. Existing + * permanent tables or views with the same name are not visible while the temporary view exists, + * unless they are referenced with schema-qualified names. + */ +case object TemporaryView extends ViewType + +/** + * Permanent means global permanent views. The views are global-scoped and accessible by all + * sessions. The permanent views stays until they are explicitly dropped. + */ +case object PermanentView extends ViewType + +/** + * Any means the view type is unknown. + */ +case object AnyTypeView extends ViewType diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 54e27b6f73502..1e42759f4be07 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -468,13 +468,27 @@ class HiveDDLCommandSuite extends PlanTest { test("create view -- basic") { val v1 = "CREATE VIEW view1 AS SELECT * FROM tab1" val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand] - assert(!command.allowExisting) + assert(command.mode == SaveMode.ErrorIfExists) + assert(command.viewType == PermanentView) assert(command.name.database.isEmpty) assert(command.name.table == "view1") assert(command.originalText == Some("SELECT * FROM tab1")) assert(command.userSpecifiedColumns.isEmpty) } + test("create view -- IF NOT EXISTS") { + val v1 = "CREATE VIEW IF NOT EXISTS view1 AS SELECT * FROM tab1" + val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand] + assert(command.mode == SaveMode.Ignore) + assert(command.viewType == PermanentView) + + val v2 = "CREATE OR REPLACE VIEW IF NOT EXISTS view1 AS SELECT * FROM tab1" + val e = intercept[ParseException] { + parser.parsePlan(v2).asInstanceOf[CreateViewCommand] + }.getMessage + assert(e.contains("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed")) + } + test("create view - full") { val v1 = """ @@ -485,6 +499,8 @@ class HiveDDLCommandSuite extends PlanTest { |AS SELECT * FROM tab1 """.stripMargin val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand] + assert(command.mode == SaveMode.Overwrite) + assert(command.viewType == PermanentView) assert(command.name.database.isEmpty) assert(command.name.table == "view1") assert(command.userSpecifiedColumns == Seq("col1" -> None, "col3" -> Some("hello"))) @@ -493,6 +509,32 @@ class HiveDDLCommandSuite extends PlanTest { assert(command.comment == Some("BLABLA")) } + test("create temporary view") { + val v1 = "CREATE TEMPORARY VIEW testView AS SELECT id FROM jt" + val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand] + assert(command.viewType == TemporaryView) + assert(command.mode == SaveMode.ErrorIfExists) + assert(command.name.database.isEmpty) + assert(command.name.table == "testView") + assert(command.originalText == Option("SELECT id FROM jt")) + assert(command.userSpecifiedColumns.isEmpty) + assert(command.comment.isEmpty) + assert(command.properties.isEmpty) + } + + test("alter view as select") { + val v1 = "ALTER VIEW testView AS SELECT id FROM jt" + val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand] + assert(command.mode == SaveMode.Overwrite) + assert(command.viewType == AnyTypeView) + assert(command.name.database.isEmpty) + assert(command.name.table == "testView") + assert(command.originalText == Option("SELECT id FROM jt")) + assert(command.userSpecifiedColumns.isEmpty) + assert(command.comment.isEmpty) + assert(command.properties.isEmpty) + } + test("create view -- partitioned view") { val v1 = "CREATE VIEW view1 partitioned on (ds, hr) as select * from srcpart" intercept[ParseException] { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index 6a80664417911..d2fac04f64fdc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -201,13 +203,89 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withView("testView", "default.testView") { sql("CREATE VIEW testView AS SELECT id FROM jt") sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt") + checkAnswer( + sql("SHOW TABLES 'testView*'"), + Row("testview", true) :: Row("testview", false) :: Nil) + } + + withView("testView", "default.testView") { + sql("CREATE VIEW testView AS SELECT id FROM jt") + sql("CREATE OR REPLACE TEMPORARY VIEW testView AS SELECT id FROM jt") + checkAnswer( + sql("SHOW TABLES 'testView*'"), + Row("testview", true) :: Row("testview", false) :: Nil) + } + + withView("testView", "default.testView") { + sql("CREATE VIEW testView AS SELECT id FROM jt") + sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt") + checkAnswer( + sql("SHOW TABLES 'testView*'"), + Row("testview", false) :: Nil) } } - test("should allow CREATE permanent VIEW when a TEMPORARY VIEW with same name exists") { + test("ALTER VIEW: alter a temporary view when a permanent VIEW with same name exists") { + alterTempView(isTempAlteredView = true) + } + + test("ALTER VIEW: alter a persistent view when a temp VIEW with same name exists") { + alterTempView(isTempAlteredView = false) + } + + private def alterTempView (isTempAlteredView: Boolean) = { + withView("testView", "default.testView") { + val catalog = spark.sessionState.catalog + val oldViewQuery = "SELECT id FROM jt" + val newViewQuery = "SELECT id, id1 FROM jt" + sql(s"CREATE VIEW default.testView AS $oldViewQuery") + sql(s"CREATE TEMPORARY VIEW testView AS $oldViewQuery") + if (isTempAlteredView) { + // When the database is not specified, we will first try to alter the temporary view + sql(s"ALTER VIEW testView AS $newViewQuery") + } else { + // When the database is specified, we will try to alter the permanent view, no matter + // whether the temporary view with the same name exists or not. + sql(s"ALTER VIEW default.testView AS $newViewQuery") + } + + val persistentView = catalog.getTableMetadata( + TableIdentifier(table = "testView", database = Some("default"))) + assert(persistentView.tableType == CatalogTableType.VIEW) + val tempView = catalog.getTableMetadata(TableIdentifier("testView")) + assert(tempView.tableType == CatalogTableType.VIEW) + assert(tempView.viewOriginalText.isEmpty) + + if (isTempAlteredView) { + // View Text of the persistent view default.testView is not changed + assert(persistentView.viewOriginalText == Option(oldViewQuery)) + // temp view testView is changed + checkAnswer( + sql(newViewQuery), + sql("select * from testView")) + } else { + // View Text of the persistent view default.testView is changed + assert(persistentView.viewOriginalText == Option(newViewQuery)) + // temp view testView is not changed + checkAnswer( + sql(oldViewQuery), + sql("select * from testView")) + } + } + } + + test("CREATE VIEW: should allow CREATE permanent VIEW when a temp VIEW with same name exists") { withView("testView", "default.testView") { sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt") sql("CREATE VIEW testView AS SELECT id FROM jt") + + // Both temporary and permanent view have been successfully created. + val catalog = spark.sessionState.catalog + val persistentView = catalog.getTableMetadata( + TableIdentifier(table = "testView", database = Some("default"))) + assert(persistentView.tableType == CatalogTableType.VIEW) + val tempView = catalog.getTableMetadata(TableIdentifier("testView")) + assert(tempView.tableType == CatalogTableType.VIEW) } }