Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, TemporaryView}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
Expand Down Expand Up @@ -2429,7 +2429,7 @@ class Dataset[T] private[sql](
*/
@throws[AnalysisException]
def createTempView(viewName: String): Unit = withPlan {
createViewCommand(viewName, replace = false)
createTempViewCommand(viewName, replace = false)
}

/**
Expand All @@ -2440,20 +2440,19 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def createOrReplaceTempView(viewName: String): Unit = withPlan {
createViewCommand(viewName, replace = true)
createTempViewCommand(viewName, replace = true)
}

private def createViewCommand(viewName: String, replace: Boolean): CreateViewCommand = {
private def createTempViewCommand(viewName: String, replace: Boolean): CreateViewCommand = {
CreateViewCommand(
name = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
userSpecifiedColumns = Nil,
comment = None,
properties = Map.empty,
originalText = None,
child = logicalPlan,
allowExisting = false,
replace = replace,
isTemporary = true)
if (replace) SaveMode.Overwrite else SaveMode.ErrorIfExists,
TemporaryView)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1254,22 +1254,24 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ic.identifier.getText -> Option(ic.STRING).map(string)
}
}
val viewType = if (ctx.TEMPORARY != null) TemporaryView else PermanentView
createView(
ctx,
ctx.tableIdentifier,
comment = Option(ctx.STRING).map(string),
userSpecifiedColumns,
ctx.query,
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty),
allowExisting = ctx.EXISTS != null,
ignoreIfExists = ctx.EXISTS != null,
replace = ctx.REPLACE != null,
isTemporary = ctx.TEMPORARY != null
viewType
)
}
}

/**
* Alter the query of a view. This creates a [[CreateViewCommand]] command.
* Alter the query of a view. This creates a [[CreateViewCommand]] command. The view type could be
* either temporary or permanent.
*/
override def visitAlterViewQuery(ctx: AlterViewQueryContext): LogicalPlan = withOrigin(ctx) {
createView(
Expand All @@ -1279,9 +1281,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
userSpecifiedColumns = Seq.empty,
query = ctx.query,
properties = Map.empty,
allowExisting = false,
ignoreIfExists = false,
replace = true,
isTemporary = false)
AnyTypeView
)
}

/**
Expand All @@ -1294,20 +1297,27 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
userSpecifiedColumns: Seq[(String, Option[String])],
query: QueryContext,
properties: Map[String, String],
allowExisting: Boolean,
ignoreIfExists: Boolean,
replace: Boolean,
isTemporary: Boolean): LogicalPlan = {
viewType: ViewType): LogicalPlan = {
val mode = (replace, ignoreIfExists) match {
case (true, false) => SaveMode.Overwrite
case (false, true) => SaveMode.Ignore
case (false, false) => SaveMode.ErrorIfExists
case _ => throw new ParseException(
"CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.", ctx)
}
val originalText = source(query)

CreateViewCommand(
visitTableIdentifier(name),
userSpecifiedColumns,
comment,
properties,
Some(originalText),
plan(query),
allowExisting = allowExisting,
replace = replace,
isTemporary = isTemporary)
mode,
viewType)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
query)
ExecutedCommandExec(cmd) :: Nil

case c: CreateTempViewUsing => ExecutedCommandExec(c) :: Nil

case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command

import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
Expand All @@ -41,14 +41,12 @@ import org.apache.spark.sql.types.StructType
* Dataset API.
* @param child the logical plan that represents the view; this is used to generate a canonicalized
* version of the SQL that can be saved in the catalog.
* @param allowExisting if true, and if the view already exists, noop; if false, and if the view
* already exists, throws analysis exception.
* @param replace if true, and if the view already exists, updates it; if false, and if the view
* already exists, throws analysis exception.
* @param isTemporary if true, the view is created as a temporary view. Temporary views are dropped
* at the end of current Spark session. Existing permanent relations with the same
* name are not visible to the current session while the temporary view exists,
* unless they are specified with full qualified table name with database prefix.
* @param mode only three modes are supported here: Ignore, Overwrite and ErrorIfExists. If the
* view already exists, CreateViewCommand behaves based on the mode:
* 1) Overwrite: update the view;
* 2) Ignore: noop;
* 3) ErrorIfExists throws analysis exception.
* @param viewType the type of this view.
*/
case class CreateViewCommand(
name: TableIdentifier,
Expand All @@ -57,9 +55,8 @@ case class CreateViewCommand(
properties: Map[String, String],
originalText: Option[String],
child: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
isTemporary: Boolean)
mode: SaveMode,
viewType: ViewType)
extends RunnableCommand {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
Expand All @@ -69,23 +66,17 @@ case class CreateViewCommand(

override def output: Seq[Attribute] = Seq.empty[Attribute]

if (!isTemporary) {
require(originalText.isDefined,
"The table to created with CREATE VIEW must have 'originalText'.")
}

if (allowExisting && replace) {
throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")
}
assert(mode != SaveMode.Append,
"CREATE or ALTER VIEW can only use ErrorIfExists, Ignore or Overwrite.")

// Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

originalText is empty only when the view is created through Dataset APIs, as shown in the code comment

This checking looks useless. If you think we should keep it, I can add it back. Thanks!

if (allowExisting && isTemporary) {
if (mode == SaveMode.Ignore && viewType == TemporaryView) {
throw new AnalysisException(
"It is not allowed to define a TEMPORARY view with IF NOT EXISTS.")
}

// Temporary view names should NOT contain database prefix like "database.table"
if (isTemporary && name.database.isDefined) {
if (viewType == TemporaryView && name.database.isDefined) {
val database = name.database.get
throw new AnalysisException(
s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.")
Expand All @@ -105,38 +96,40 @@ case class CreateViewCommand(
}
val sessionState = sparkSession.sessionState

if (isTemporary) {
createTemporaryView(sparkSession, analyzedPlan)
} else {
// Adds default database for permanent table if it doesn't exist, so that tableExists()
// only check permanent tables.
val database = name.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val qualifiedName = name.copy(database = Option(database))

if (sessionState.catalog.tableExists(qualifiedName)) {
val tableMetadata = sessionState.catalog.getTableMetadata(qualifiedName)
if (allowExisting) {
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
// already exists.
} else if (tableMetadata.tableType != CatalogTableType.VIEW) {
throw new AnalysisException(
"Existing table is not a view. The following is an existing table, " +
s"not a view: $qualifiedName")
} else if (replace) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
viewType match {
case TemporaryView =>
createTemporaryView(sparkSession, analyzedPlan)
case AnyTypeView if sessionState.catalog.isTemporaryTable(name) =>
createTemporaryView(sparkSession, analyzedPlan)
case _ =>
// Adds default database for permanent table if it doesn't exist, so that tableExists()
// only check permanent tables.
val database = name.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val qualifiedName = name.copy(database = Option(database))
if (sessionState.catalog.tableExists(qualifiedName)) {
val tableMetadata = sessionState.catalog.getTableMetadata(qualifiedName)
if (mode == SaveMode.Ignore) {
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target
// view already exists.
} else if (tableMetadata.tableType != CatalogTableType.VIEW) {
throw new AnalysisException(
"Existing table is not a view. The following is an existing table, " +
s"not a view: $qualifiedName")
} else if (mode == SaveMode.Overwrite) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
} else {
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
// exists.
throw new AnalysisException(
s"View $qualifiedName already exists. If you want to update the view definition, " +
"please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
}
} else {
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
// exists.
throw new AnalysisException(
s"View $qualifiedName already exists. If you want to update the view definition, " +
"please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
// Create the view if it doesn't exist.
sessionState.catalog.createTable(
prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
}
} else {
// Create the view if it doesn't exist.
sessionState.catalog.createTable(
prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
}
}
Seq.empty[Row]
}
Expand All @@ -154,7 +147,7 @@ case class CreateViewCommand(
sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}

catalog.createTempView(name.table, logicalPlan, replace)
catalog.createTempView(name.table, logicalPlan, mode == SaveMode.Overwrite)
}

/**
Expand Down Expand Up @@ -202,3 +195,27 @@ case class CreateViewCommand(
)
}
}

/**
* The trait used to represent the type of a view.
*/
sealed trait ViewType

/**
* Temporary means local temporary views. The views are session-scoped and automatically dropped
* when the session terminates. Do not qualify a temporary table with a schema name. Existing
* permanent tables or views with the same name are not visible while the temporary view exists,
* unless they are referenced with schema-qualified names.
*/
case object TemporaryView extends ViewType

/**
* Permanent means global permanent views. The views are global-scoped and accessible by all
* sessions. The permanent views stays until they are explicitly dropped.
*/
case object PermanentView extends ViewType

/**
* Any means the view type is unknown.
*/
case object AnyTypeView extends ViewType
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,27 @@ class HiveDDLCommandSuite extends PlanTest {
test("create view -- basic") {
val v1 = "CREATE VIEW view1 AS SELECT * FROM tab1"
val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand]
assert(!command.allowExisting)
assert(command.mode == SaveMode.ErrorIfExists)
assert(command.viewType == PermanentView)
assert(command.name.database.isEmpty)
assert(command.name.table == "view1")
assert(command.originalText == Some("SELECT * FROM tab1"))
assert(command.userSpecifiedColumns.isEmpty)
}

test("create view -- IF NOT EXISTS") {
val v1 = "CREATE VIEW IF NOT EXISTS view1 AS SELECT * FROM tab1"
val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand]
assert(command.mode == SaveMode.Ignore)
assert(command.viewType == PermanentView)

val v2 = "CREATE OR REPLACE VIEW IF NOT EXISTS view1 AS SELECT * FROM tab1"
val e = intercept[ParseException] {
parser.parsePlan(v2).asInstanceOf[CreateViewCommand]
}.getMessage
assert(e.contains("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed"))
}

test("create view - full") {
val v1 =
"""
Expand All @@ -485,6 +499,8 @@ class HiveDDLCommandSuite extends PlanTest {
|AS SELECT * FROM tab1
""".stripMargin
val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand]
assert(command.mode == SaveMode.Overwrite)
assert(command.viewType == PermanentView)
assert(command.name.database.isEmpty)
assert(command.name.table == "view1")
assert(command.userSpecifiedColumns == Seq("col1" -> None, "col3" -> Some("hello")))
Expand All @@ -493,6 +509,32 @@ class HiveDDLCommandSuite extends PlanTest {
assert(command.comment == Some("BLABLA"))
}

test("create temporary view") {
val v1 = "CREATE TEMPORARY VIEW testView AS SELECT id FROM jt"
val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand]
assert(command.viewType == TemporaryView)
assert(command.mode == SaveMode.ErrorIfExists)
assert(command.name.database.isEmpty)
assert(command.name.table == "testView")
assert(command.originalText == Option("SELECT id FROM jt"))
assert(command.userSpecifiedColumns.isEmpty)
assert(command.comment.isEmpty)
assert(command.properties.isEmpty)
}

test("alter view as select") {
val v1 = "ALTER VIEW testView AS SELECT id FROM jt"
val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand]
assert(command.mode == SaveMode.Overwrite)
assert(command.viewType == AnyTypeView)
assert(command.name.database.isEmpty)
assert(command.name.table == "testView")
assert(command.originalText == Option("SELECT id FROM jt"))
assert(command.userSpecifiedColumns.isEmpty)
assert(command.comment.isEmpty)
assert(command.properties.isEmpty)
}

test("create view -- partitioned view") {
val v1 = "CREATE VIEW view1 partitioned on (ds, hr) as select * from srcpart"
intercept[ParseException] {
Expand Down
Loading