Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import java.util.Date

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -112,6 +111,8 @@ case class BucketSpec(
* Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
* future once we have a better understanding of how we want to handle skewed columns.
*
* @param provider the name of the data source provider for this table, e.g. parquet, json, etc.
* Can be None if this table is a View, should be "hive" for hive serde tables.
* @param unsupportedFeatures is a list of string descriptions of features that are used by the
* underlying table but not supported by Spark SQL yet.
*/
Expand All @@ -120,6 +121,7 @@ case class CatalogTable(
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: StructType,
provider: Option[String] = None,
partitionColumnNames: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None,
owner: String = "",
Expand All @@ -131,16 +133,6 @@ case class CatalogTable(
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty) {

// Verify that the provided columns are part of the schema
private val colNames = schema.map(_.name).toSet
private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = {
require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " +
s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
}
requireSubsetOfSchema(partitionColumnNames, "partition")
requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort")
requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket")

/** schema of this table's partition columns */
def partitionSchema: StructType = StructType(schema.filter {
c => partitionColumnNames.contains(c.name)
Expand Down Expand Up @@ -189,6 +181,7 @@ case class CatalogTable(
s"Last Access: ${new Date(lastAccessTime).toString}",
s"Type: ${tableType.name}",
if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" else "",
if (provider.isDefined) s"Provider: ${provider.get}" else "",
if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else ""
) ++ bucketStrings ++ Seq(
viewOriginalText.map("Original View: " + _).getOrElse(""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
identifier = TableIdentifier("my_table", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
schema = new StructType().add("a", "int").add("b", "string")
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some("hive")
)

catalog.createTable("db1", table, ignoreIfExists = false)
Expand All @@ -571,7 +572,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
storage = CatalogStorageFormat(
Some(Utils.createTempDir().getAbsolutePath),
None, None, None, false, Map.empty),
schema = new StructType().add("a", "int").add("b", "string")
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some("hive")
)
catalog.createTable("db1", externalTable, ignoreIfExists = false)
assert(!exists(db.locationUri, "external_table"))
Expand All @@ -589,6 +591,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
.add("col2", "string")
.add("a", "int")
.add("b", "string"),
provider = Some("hive"),
partitionColumnNames = Seq("a", "b")
)
catalog.createTable("db1", table, ignoreIfExists = false)
Expand Down Expand Up @@ -692,6 +695,7 @@ abstract class CatalogTestUtils {
.add("col2", "string")
.add("a", "int")
.add("b", "string"),
provider = Some("hive"),
partitionColumnNames = Seq("a", "b"),
bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
}
Expand Down
24 changes: 13 additions & 11 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.types.StructType

/**
* Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
Expand Down Expand Up @@ -367,15 +368,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(s"Table $tableIdent already exists.")

case _ =>
val cmd =
CreateTableUsingAsSelect(
tableIdent,
source,
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
getBucketSpec,
mode,
extraOptions.toMap,
df.logicalPlan)
val tableDesc = CatalogTable(
identifier = tableIdent,
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat.empty.copy(properties = extraOptions.toMap),
schema = new StructType,
provider = Some(source),
partitionColumnNames = partitioningColumns.getOrElse(Nil),
bucketSpec = getBucketSpec
)
val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
Copy link
Contributor

@hvanhovell hvanhovell Aug 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: we really shouldn't create the Some(...) constructor use Option(...) instead.

...happens a few times - but I'll stop complaining after this one :)...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, do we have to use Option even though the parameter is guaranteed to be not null?

For this case, we can't use Option, or the behaviour will be changed. Previously if df.logicalPlan is null, it's a bug and we will throw NPE somewhere. If we use Option here, then we are silently converting a CTAS to CREATE TABLE, which is not expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I am nit picking here :).

I have seen a few cases in which some thought the parameter was non-null and used Some(...) to wrap that; resulting in a very nice NPE down the line (which you don't expect in an Option). In this case you are totally right to use Some(...).

df.sparkSession.sessionState.executePlan(cmd).toRdd
}
}
Expand Down
8 changes: 2 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal

import com.fasterxml.jackson.core.JsonFactory
import org.apache.commons.lang3.StringUtils

import org.apache.spark.annotation.{DeveloperApi, Experimental}
Expand All @@ -35,18 +34,16 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.catalyst.optimizer.CombineUnions
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
Expand Down Expand Up @@ -174,8 +171,7 @@ class Dataset[T] private[sql](
@transient private[sql] val logicalPlan: LogicalPlan = {
def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
case _: Command |
_: InsertIntoTable |
_: CreateTableUsingAsSelect => true
_: InsertIntoTable => true
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _}
import org.apache.spark.sql.execution.datasources.{CreateTable, CreateTempViewUsing, _}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
import org.apache.spark.sql.types.{DataType, StructType}

Expand Down Expand Up @@ -310,7 +310,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}

/**
* Create a [[CreateTableUsing]] or a [[CreateTableUsingAsSelect]] logical plan.
* Create a [[CreateTable]] logical plan.
*/
override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
Expand All @@ -319,12 +319,31 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
val provider = ctx.tableProvider.qualifiedName.getText
val schema = Option(ctx.colTypeList()).map(createStructType)
val partitionColumnNames =
Option(ctx.partitionColumnNames)
.map(visitIdentifierList(_).toArray)
.getOrElse(Array.empty[String])
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)

val tableDesc = CatalogTable(
identifier = table,
// TODO: actually the table type may be EXTERNAL if we have `path` in options. However, the
// physical plan `CreateDataSourceTableCommand` doesn't take table type as parameter, but a
// boolean flag called `managedIfNoPath`. We set the table type to MANAGED here to simulate
// setting the `managedIfNoPath` flag. In the future we should refactor the physical plan and
// make it take `CatalogTable` directly.
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(properties = options),
schema = schema.getOrElse(new StructType),
provider = Some(provider),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it still possible that a provide is None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, just look at the codes around, it's possible that provider is null. However, if we also look at the antlr file, the provider must be specified. The previous code doesn't check the null either, and will throw NPE somewhere if it's null. So I think we should use Some here to follow this behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a more general question which you have answered, sorry about that.

In this case the provider cannot be null: the grammar requires a provider and the call ctx.tableProvider.qualifiedName.getText would result in a NPE if it were null.

partitionColumnNames = partitionColumnNames,
bucketSpec = bucketSpec
)

// Determine the storage mode.
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists

if (ctx.query != null) {
// Get the backing query.
val query = plan(ctx.query)
Expand All @@ -333,32 +352,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx)
}

// Determine the storage mode.
val mode = if (ifNotExists) {
SaveMode.Ignore
} else {
SaveMode.ErrorIfExists
}

CreateTableUsingAsSelect(
table, provider, partitionColumnNames, bucketSpec, mode, options, query)
CreateTable(tableDesc, mode, Some(query))
} else {
val struct = Option(ctx.colTypeList()).map(createStructType)
if (struct.isEmpty && bucketSpec.nonEmpty) {
throw new ParseException(
"Expected explicit specification of table schema when using CLUSTERED BY clause.", ctx)
}
if (temp) {
if (ifNotExists) {
operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
}

CreateTableUsing(
table,
struct,
provider,
temp,
options,
partitionColumnNames,
bucketSpec,
ifNotExists,
managedIfNoPath = true)
logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +
"CREATE TEMPORARY VIEW ... USING ... instead")
CreateTempViewUsing(table, schema, replace = true, provider, options)
} else {
CreateTable(tableDesc, mode, None)
}
}
}

Expand Down Expand Up @@ -891,8 +897,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}

/**
* Create a table, returning either a [[CreateTableCommand]] or a
* [[CreateHiveTableAsSelectLogicalPlan]].
* Create a table, returning a [[CreateTable]] logical plan.
*
* This is not used to create datasource tables, which is handled through
* "CREATE TABLE ... USING ...".
Expand Down Expand Up @@ -933,23 +938,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
val selectQuery = Option(ctx.query).map(plan)

// Ensuring whether no duplicate name is used in table definition
val colNames = dataCols.map(_.name)
if (colNames.length != colNames.distinct.length) {
val duplicateColumns = colNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
}
operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
duplicateColumns.mkString("[", ",", "]"), ctx)
}

// For Hive tables, partition columns must not be part of the schema
val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)
if (badPartCols.nonEmpty) {
operationNotAllowed(s"Partition columns may not be specified in the schema: " +
badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
}

// Note: Hive requires partition columns to be distinct from the schema, so we need
// to include the partition columns here explicitly
val schema = StructType(dataCols ++ partitionCols)
Expand Down Expand Up @@ -1001,10 +989,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
tableType = tableType,
storage = storage,
schema = schema,
provider = Some("hive"),
partitionColumnNames = partitionCols.map(_.name),
properties = properties,
comment = comment)

val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists

selectQuery match {
case Some(q) =>
// Just use whatever is projected in the select statement as our schema
Expand All @@ -1025,27 +1016,24 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {

val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null)
if (conf.convertCTAS && !hasStorageProperties) {
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
// are empty Maps.
val optionsWithPath = if (location.isDefined) {
Map("path" -> location.get)
} else {
Map.empty[String, String]
}
CreateTableUsingAsSelect(
tableIdent = tableDesc.identifier,
provider = conf.defaultDataSourceName,
partitionColumns = tableDesc.partitionColumnNames.toArray,
bucketSpec = None,
mode = mode,
options = optionsWithPath,
q

val newTableDesc = tableDesc.copy(
storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
provider = Some(conf.defaultDataSourceName)
)

CreateTable(newTableDesc, mode, Some(q))
} else {
CreateHiveTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
CreateTable(tableDesc, mode, Some(q))
}
case None => CreateTableCommand(tableDesc, ifNotExists)
case None => CreateTable(tableDesc, mode, None)
}
}

Expand Down
Loading