Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ trait CheckAnalysis extends PredicateHelper {

case InsertIntoTable(t, _, _, _, _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @yhuai do you remember why we have this check? InsertIntoTable can only be used for table right? When will we hit this branch?

Copy link
Member Author

@gatorsmile gatorsmile Sep 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    spark.sql(
      s"""CREATE TEMPORARY VIEW normal_orc_source
         |USING org.apache.spark.sql.hive.orc
         |OPTIONS (
         |  PATH '${new File(orcTableAsDir.getAbsolutePath).getCanonicalPath}'
         |)
       """.stripMargin)
    sql("INSERT INTO TABLE normal_orc_source SELECT * FROM orc_temp_table WHERE intField > 5")

We allow users to insert rows into temporary views that is created using CREATE TEMPORARY VIEW.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the temporary view is created by createOrReplaceTempView, users are unable to insert rows into the temporary view.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can insert data into a temp view? Is it by design? cc @rxin @marmbrus @yhuai

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this might be designed for JDBC data sources at the beginning. Just a guess.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably dates back to when we called these temp tables and when we didn't have persistent data source tables. Given that I think we probably have to support this for compatibility. I'm not sure why this isn't a whitelist though?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the reason is LogicalRelation is not in the package of sql\catalyst. Otherwise, it should be like

          case InsertIntoTable(t, _, _, _, _)
            if !t.isInstanceOf[SimpleCatalogRelation] && !t.isInstanceOf[LogicalRelation]

if !t.isInstanceOf[LeafNode] ||
t.isInstanceOf[Range] ||
t == OneRowRelation ||
t.isInstanceOf[LocalRelation] =>
failAnalysis(s"Inserting into an RDD-based table is not allowed.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,27 +246,16 @@ class SessionCatalog(
}

/**
* Retrieve the metadata of an existing metastore table.
* If no database is specified, assume the table is in the current database.
* If the specified table is not found in the database then a [[NoSuchTableException]] is thrown.
* Retrieve the metadata of an existing permanent table/view. If no database is specified,
* assume the table/view is in the current database. If the specified table/view is not found
* in the database then a [[NoSuchTableException]] is thrown.
*/
def getTableMetadata(name: TableIdentifier): CatalogTable = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
val tid = TableIdentifier(table)
if (isTemporaryTable(name)) {
CatalogTable(
identifier = tid,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = tempTables(table).output.toStructType,
properties = Map(),
viewText = None)
} else {
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.getTable(db, table)
}
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.getTable(db, table)
}

/**
Expand All @@ -281,6 +270,24 @@ class SessionCatalog(
externalCatalog.getTableOption(db, table)
}

/**
* Retrieve the metadata of an existing temporary view or permanent table/view.
* If the temporary view does not exist, tries to get the metadata an existing permanent
* table/view. If no database is specified, assume the table/view is in the current database.
* If the specified table/view is not found in the database then a [[NoSuchTableException]] is
* thrown.
*/
def getTempViewOrPermanentTableMetadata(name: String): CatalogTable = synchronized {
val table = formatTableName(name)
getTempView(table).map { plan =>
CatalogTable(
identifier = TableIdentifier(table),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = plan.output.toStructType)
}.getOrElse(getTableMetadata(TableIdentifier(name)))
}

/**
* Load files stored in given path into an existing metastore table.
* If no database is specified, assume the table is in the current database.
Expand Down Expand Up @@ -530,11 +537,11 @@ class SessionCatalog(
tableName: TableIdentifier,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
}

Expand All @@ -547,11 +554,11 @@ class SessionCatalog(
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = {
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge)
}

Expand All @@ -566,12 +573,12 @@ class SessionCatalog(
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = {
val tableMetadata = getTableMetadata(tableName)
requireExactMatchedPartitionSpec(specs, tableMetadata)
requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(specs, tableMetadata)
requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
externalCatalog.renamePartitions(db, table, specs, newSpecs)
}

Expand All @@ -585,11 +592,11 @@ class SessionCatalog(
* this becomes a no-op.
*/
def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
externalCatalog.alterPartitions(db, table, parts)
}

Expand All @@ -598,11 +605,11 @@ class SessionCatalog(
* If no database is specified, assume the table is in the current database.
*/
def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
externalCatalog.getPartition(db, table, spec)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,27 +444,16 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
}

test("getTableMetadata on temporary views") {
test("getTempViewOrPermanentTableMetadata on temporary views") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10)
val m = intercept[AnalysisException] {
catalog.getTableMetadata(TableIdentifier("view1"))
}.getMessage
assert(m.contains("Table or view 'view1' not found in database 'default'"))

val m2 = intercept[AnalysisException] {
catalog.getTableMetadata(TableIdentifier("view1", Some("default")))
intercept[NoSuchTableException] {
catalog.getTempViewOrPermanentTableMetadata("view1")
}.getMessage
assert(m2.contains("Table or view 'view1' not found in database 'default'"))

catalog.createTempView("view1", tempTable, overrideIfExists = false)
assert(catalog.getTableMetadata(TableIdentifier("view1")).identifier.table == "view1")
assert(catalog.getTableMetadata(TableIdentifier("view1")).schema(0).name == "id")

val m3 = intercept[AnalysisException] {
catalog.getTableMetadata(TableIdentifier("view1", Some("default")))
}.getMessage
assert(m3.contains("Table or view 'view1' not found in database 'default'"))
assert(catalog.getTempViewOrPermanentTableMetadata("view1").identifier ==
TableIdentifier("view1"), "the temporary view `view1` should exist")
}

test("list tables without pattern") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
Expand All @@ -37,7 +38,9 @@ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extend
override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionState = sparkSession.sessionState
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentwithDB = TableIdentifier(tableIdent.table, Some(db))
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentwithDB))

relation match {
case relation: CatalogRelation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ case class AlterTableUnsetPropertiesCommand(
propKeys.foreach { k =>
if (!table.properties.contains(k)) {
throw new AnalysisException(
s"Attempted to unset non-existent property '$k' in table '$tableName'")
s"Attempted to unset non-existent property '$k' in table '${table.identifier}'")
}
}
}
Expand Down Expand Up @@ -317,11 +317,11 @@ case class AlterTableSerDePropertiesCommand(
catalog.alterTable(newTable)
} else {
val spec = partSpec.get
val part = catalog.getPartition(tableName, spec)
val part = catalog.getPartition(table.identifier, spec)
val newPart = part.copy(storage = part.storage.copy(
serde = serdeClassName.orElse(part.storage.serde),
properties = part.storage.properties ++ serdeProperties.getOrElse(Map())))
catalog.alterPartitions(tableName, Seq(newPart))
catalog.alterPartitions(table.identifier, Seq(newPart))
}
Seq.empty[Row]
}
Expand Down Expand Up @@ -358,7 +358,7 @@ case class AlterTableAddPartitionCommand(
// inherit table storage format (possibly except for location)
CatalogTablePartition(spec, table.storage.copy(locationUri = location))
}
catalog.createPartitions(tableName, parts, ignoreIfExists = ifNotExists)
catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
Seq.empty[Row]
}

Expand Down Expand Up @@ -422,7 +422,7 @@ case class AlterTableDropPartitionCommand(
throw new AnalysisException(
"ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
}
catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists, purge = purge)
catalog.dropPartitions(table.identifier, specs, ignoreIfNotExists = ifExists, purge = purge)
Seq.empty[Row]
}

Expand Down Expand Up @@ -471,26 +471,20 @@ case class AlterTableRecoverPartitionsCommand(

override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
if (!catalog.tableExists(tableName)) {
throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
}
if (catalog.isTemporaryTable(tableName)) {
throw new AnalysisException(
s"Operation not allowed: $cmd on temporary tables: $tableName")
}
val table = catalog.getTableMetadata(tableName)
val tableIdentWithDB = table.identifier.quotedString
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
s"Operation not allowed: $cmd on datasource tables: $tableName")
s"Operation not allowed: $cmd on datasource tables: $tableIdentWithDB")
}
if (table.partitionColumnNames.isEmpty) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
}
if (table.storage.locationUri.isEmpty) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on table with location provided: $tableName")
throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
s"location provided: $tableIdentWithDB")
}

val root = new Path(table.storage.locationUri.get)
Expand Down Expand Up @@ -659,7 +653,7 @@ case class AlterTableSetLocationCommand(
partitionSpec match {
case Some(spec) =>
// Partition spec is specified, so we set the location only for this partition
val part = catalog.getPartition(tableName, spec)
val part = catalog.getPartition(table.identifier, spec)
val newPart =
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
Expand All @@ -668,7 +662,7 @@ case class AlterTableSetLocationCommand(
} else {
part.copy(storage = part.storage.copy(locationUri = Some(location)))
}
catalog.alterPartitions(tableName, Seq(newPart))
catalog.alterPartitions(table.identifier, Seq(newPart))
case None =>
// No partition spec is specified, so we set the location for the table itself
val newTable =
Expand Down
Loading