Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand All @@ -51,7 +50,7 @@ private[libsvm] class LibSVMOutputWriter(
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
// Storage format
val defaultStorage: CatalogStorageFormat = {
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf)
val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType)
CatalogStorageFormat(
locationUri = None,
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
Expand Down Expand Up @@ -1115,7 +1115,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
override def visitGenericFileFormat(
ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
val source = ctx.identifier.getText
HiveSerDe.sourceToSerDe(source, conf) match {
HiveSerDe.sourceToSerDe(source) match {
case Some(s) =>
CatalogStorageFormat.empty.copy(
inputFormat = s.inputFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@

package org.apache.spark.sql.execution.command

import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -97,16 +92,19 @@ case class CreateDataSourceTableCommand(
}
}

CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
val table = CatalogTable(
identifier = tableIdent,
tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
schema = dataSource.schema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
options = optionsWithPath,
isExternal = isExternal)

provider = Some(provider),
partitionColumnNames = partitionColumns,
bucketSpec = bucketSpec
)

// We will return Nil or throw exception at the beginning if the table already exists, so when
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
sessionState.catalog.createTable(table, ignoreIfExists = false)
Copy link
Member

@gatorsmile gatorsmile Aug 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like we are not following the existing behaviors. options might be consumed by the external Data Source connectors. options is not only used for specifying path, but also used as a channel to pass extra parameters to the data source.

I checked the existing implementation of createDataSourceTable. We pass the original options into the constructor of DataSource. Then, the write API will pass the option to createRelation.

How about adding it as an independent field in CatalogTable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put the options in CatalogStorageFormat.properties, and when the table is read back, we will get the storage.properties as the data source options for create relation, see https://github.com/apache/spark/pull/14155/files#diff-d99813bd5bbc18277e4090475e4944cfR214

Copy link
Member

@gatorsmile gatorsmile Aug 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That line is just putting the options into the storage properties. It works for path, but the existing Spark code pass the user-specified options into createRelation. I think option is a critical parameter-passing channel for the external data source connectors.

Copy link
Member

@gatorsmile gatorsmile Aug 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan How about the write path? My previous reply to @yhuai is for the write path.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, it sounds like the write API is just called by CTAS and the save API of DataFrameWriter. It is OK. Let me read it again and check if we might have an issue for options in the CREATE Data Source Table command.

Seq.empty[Row]
}
}
Expand Down Expand Up @@ -193,7 +191,7 @@ case class CreateDataSourceTableAsSelectCommand(
}
existingSchema = Some(l.schema)
case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
existingSchema = Some(DDLUtils.getSchemaFromTableProperties(s.metadata))
existingSchema = Some(s.metadata.schema)
case o =>
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
}
Expand Down Expand Up @@ -233,226 +231,21 @@ case class CreateDataSourceTableAsSelectCommand(
// We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
schema = result.schema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
options = optionsWithPath,
isExternal = isExternal)
val schema = result.schema
val table = CatalogTable(
identifier = tableIdent,
tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
schema = schema,
provider = Some(provider),
partitionColumnNames = partitionColumns,
bucketSpec = bucketSpec
)
sessionState.catalog.createTable(table, ignoreIfExists = false)
}

// Refresh the cache of the table in the catalog.
sessionState.catalog.refreshTable(tableIdent)
Seq.empty[Row]
}
}


object CreateDataSourceTableUtils extends Logging {

val DATASOURCE_PREFIX = "spark.sql.sources."
val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID"
val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path"
val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols"
val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part."
val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol."
val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."

def createDataSourceTable(
sparkSession: SparkSession,
tableIdent: TableIdentifier,
schema: StructType,
partitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
provider: String,
options: Map[String, String],
isExternal: Boolean): Unit = {
val tableProperties = new mutable.HashMap[String, String]
tableProperties.put(DATASOURCE_PROVIDER, provider)

// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
// property.
val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}

if (partitionColumns.length > 0) {
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
}
}

if (bucketSpec.isDefined) {
val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get

tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
}

if (sortColumnNames.nonEmpty) {
tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
}
}
}

val tableType = if (isExternal) {
tableProperties.put("EXTERNAL", "TRUE")
CatalogTableType.EXTERNAL
} else {
tableProperties.put("EXTERNAL", "FALSE")
CatalogTableType.MANAGED
}

val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sparkSession.sessionState.conf)
val dataSource =
DataSource(
sparkSession,
userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
className = provider,
options = options)

def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
CatalogTable(
identifier = tableIdent,
tableType = tableType,
schema = new StructType,
provider = Some(provider),
storage = CatalogStorageFormat(
locationUri = None,
inputFormat = None,
outputFormat = None,
serde = None,
compressed = false,
properties = options
),
properties = tableProperties.toMap)
}

def newHiveCompatibleMetastoreTable(
relation: HadoopFsRelation,
serde: HiveSerDe): CatalogTable = {
assert(partitionColumns.isEmpty)
assert(relation.partitionSchema.isEmpty)

CatalogTable(
identifier = tableIdent,
tableType = tableType,
storage = CatalogStorageFormat(
locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
inputFormat = serde.inputFormat,
outputFormat = serde.outputFormat,
serde = serde.serde,
compressed = false,
properties = options
),
schema = relation.schema,
provider = Some(provider),
properties = tableProperties.toMap,
viewText = None)
}

// TODO: Support persisting partitioned data source relations in Hive compatible format
val qualifiedTableName = tableIdent.quotedString
val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
val resolvedRelation = dataSource.resolveRelation(checkPathExist = false)
val (hiveCompatibleTable, logMessage) = (maybeSerDe, resolvedRelation) match {
case _ if skipHiveMetadata =>
val message =
s"Persisting partitioned data source relation $qualifiedTableName into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive."
(None, message)

case (Some(serde), relation: HadoopFsRelation) if relation.location.paths.length == 1 &&
relation.partitionSchema.isEmpty && relation.bucketSpec.isEmpty =>
val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
val message =
s"Persisting data source relation $qualifiedTableName with a single input path " +
s"into Hive metastore in Hive compatible format. Input path: " +
s"${relation.location.paths.head}."
(Some(hiveTable), message)

case (Some(serde), relation: HadoopFsRelation) if relation.partitionSchema.nonEmpty =>
val message =
s"Persisting partitioned data source relation $qualifiedTableName into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
"Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
(None, message)

case (Some(serde), relation: HadoopFsRelation) if relation.bucketSpec.nonEmpty =>
val message =
s"Persisting bucketed data source relation $qualifiedTableName into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
"Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
(None, message)

case (Some(serde), relation: HadoopFsRelation) =>
val message =
s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
s"Input paths: " + relation.location.paths.mkString("\n", "\n", "")
(None, message)

case (Some(serde), _) =>
val message =
s"Data source relation $qualifiedTableName is not a " +
s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " +
"in Spark SQL specific format, which is NOT compatible with Hive."
(None, message)

case _ =>
val message =
s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
s"Persisting data source relation $qualifiedTableName into Hive metastore in " +
s"Spark SQL specific format, which is NOT compatible with Hive."
(None, message)
}

(hiveCompatibleTable, logMessage) match {
case (Some(table), message) =>
// We first try to save the metadata of the table in a Hive compatible way.
// If Hive throws an error, we fall back to save its metadata in the Spark SQL
// specific way.
try {
logInfo(message)
sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
} catch {
case NonFatal(e) =>
val warningMessage =
s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " +
s"it into Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
val table = newSparkSQLSpecificMetastoreTable()
sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
}

case (None, message) =>
logWarning(message)
val table = newSparkSQLSpecificMetastoreTable()
sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
}
}
}
Loading