Skip to content

Commit 30f3f55

Browse files
committed
[SPARK-7763] [SPARK-7616] [SQL] Persists partition columns into metastore
Author: Yin Huai <[email protected]> Author: Cheng Lian <[email protected]> Closes #6285 from liancheng/spark-7763 and squashes the following commits: bb2829d [Yin Huai] Fix hashCode. d677f7d [Cheng Lian] Fixes Scala style issue 44b283f [Cheng Lian] Adds test case for SPARK-7616 6733276 [Yin Huai] Fix a bug that potentially causes https://issues.apache.org/jira/browse/SPARK-7616. 6cabf3c [Yin Huai] Update unit test. 7e02910 [Yin Huai] Use metastore partition columns and do not hijack maybePartitionSpec. e9a03ec [Cheng Lian] Persists partition columns into metastore
1 parent 311fab6 commit 30f3f55

File tree

12 files changed

+211
-56
lines changed

12 files changed

+211
-56
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ private[sql] class DefaultSource extends HadoopFsRelationProvider {
4949
schema: Option[StructType],
5050
partitionColumns: Option[StructType],
5151
parameters: Map[String, String]): HadoopFsRelation = {
52-
val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
53-
new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext)
52+
new ParquetRelation2(paths, schema, None, partitionColumns, parameters)(sqlContext)
5453
}
5554
}
5655

@@ -118,12 +117,28 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
118117
private[sql] class ParquetRelation2(
119118
override val paths: Array[String],
120119
private val maybeDataSchema: Option[StructType],
120+
// This is for metastore conversion.
121121
private val maybePartitionSpec: Option[PartitionSpec],
122+
override val userDefinedPartitionColumns: Option[StructType],
122123
parameters: Map[String, String])(
123124
val sqlContext: SQLContext)
124125
extends HadoopFsRelation(maybePartitionSpec)
125126
with Logging {
126127

128+
private[sql] def this(
129+
paths: Array[String],
130+
maybeDataSchema: Option[StructType],
131+
maybePartitionSpec: Option[PartitionSpec],
132+
parameters: Map[String, String])(
133+
sqlContext: SQLContext) = {
134+
this(
135+
paths,
136+
maybeDataSchema,
137+
maybePartitionSpec,
138+
maybePartitionSpec.map(_.partitionColumns),
139+
parameters)(sqlContext)
140+
}
141+
127142
// Should we merge schemas from all Parquet part-files?
128143
private val shouldMergeSchemas =
129144
parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
@@ -161,15 +176,15 @@ private[sql] class ParquetRelation2(
161176
Boolean.box(shouldMergeSchemas),
162177
paths.toSet,
163178
maybeDataSchema,
164-
maybePartitionSpec)
179+
partitionColumns)
165180
} else {
166181
Objects.hashCode(
167182
Boolean.box(shouldMergeSchemas),
168183
paths.toSet,
169184
dataSchema,
170185
schema,
171186
maybeDataSchema,
172-
maybePartitionSpec)
187+
partitionColumns)
173188
}
174189
}
175190

@@ -185,9 +200,6 @@ private[sql] class ParquetRelation2(
185200

186201
override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
187202

188-
override def userDefinedPartitionColumns: Option[StructType] =
189-
maybePartitionSpec.map(_.partitionColumns)
190-
191203
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
192204
val conf = ContextUtil.getConfiguration(job)
193205

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
9393
job.setOutputValueClass(classOf[Row])
9494
FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
9595

96+
// We create a DataFrame by applying the schema of relation to the data to make sure.
97+
// We are writing data based on the expected schema,
9698
val df = sqlContext.createDataFrame(
9799
DataFrame(sqlContext, query).queryExecution.toRdd,
98100
relation.schema,

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
2525
import org.apache.spark.Logging
2626
import org.apache.spark.deploy.SparkHadoopUtil
2727
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
28-
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
28+
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
2929
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
3030
import org.apache.spark.sql.catalyst.plans.logical._
3131
import org.apache.spark.sql.execution.RunnableCommand
@@ -245,12 +245,13 @@ private[sql] object ResolvedDataSource {
245245
SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
246246
}
247247

248-
val dataSchema = StructType(schema.filterNot(f => partitionColumns.contains(f.name)))
248+
val dataSchema =
249+
StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable
249250

250251
dataSource.createRelation(
251252
sqlContext,
252253
paths,
253-
Some(schema),
254+
Some(dataSchema),
254255
maybePartitionsSchema,
255256
caseInsensitiveOptions)
256257
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
@@ -320,10 +321,20 @@ private[sql] object ResolvedDataSource {
320321
Some(dataSchema.asNullable),
321322
Some(partitionColumnsSchema(data.schema, partitionColumns)),
322323
caseInsensitiveOptions)
324+
325+
// For partitioned relation r, r.schema's column ordering is different with the column
326+
// ordering of data.logicalPlan. We need a Project to adjust the ordering.
327+
// So, inside InsertIntoHadoopFsRelation, we can safely apply the schema of r.schema to
328+
// the data.
329+
val project =
330+
Project(
331+
r.schema.map(field => new UnresolvedAttribute(Seq(field.name))),
332+
data.logicalPlan)
333+
323334
sqlContext.executePlan(
324335
InsertIntoHadoopFsRelation(
325336
r,
326-
data.logicalPlan,
337+
project,
327338
partitionColumns.toArray,
328339
mode)).toRdd
329340
r

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
2828
import org.apache.spark.broadcast.Broadcast
2929
import org.apache.spark.rdd.RDD
3030
import org.apache.spark.SerializableWritable
31-
import org.apache.spark.sql._
31+
import org.apache.spark.sql.{Row, _}
3232
import org.apache.spark.sql.catalyst.expressions._
3333
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
3434
import org.apache.spark.sql.types.{StructField, StructType}
@@ -120,11 +120,13 @@ trait HadoopFsRelationProvider {
120120
* Returns a new base relation with the given parameters, a user defined schema, and a list of
121121
* partition columns. Note: the parameters' keywords are case insensitive and this insensitivity
122122
* is enforced by the Map that is passed to the function.
123+
*
124+
* @param dataSchema Schema of data columns (i.e., columns that are not partition columns).
123125
*/
124126
def createRelation(
125127
sqlContext: SQLContext,
126128
paths: Array[String],
127-
schema: Option[StructType],
129+
dataSchema: Option[StructType],
128130
partitionColumns: Option[StructType],
129131
parameters: Map[String, String]): HadoopFsRelation
130132
}
@@ -416,8 +418,29 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
416418
final private[sql] def partitionSpec: PartitionSpec = {
417419
if (_partitionSpec == null) {
418420
_partitionSpec = maybePartitionSpec
419-
.map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable))
420-
.orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition])))
421+
.flatMap {
422+
case spec if spec.partitions.nonEmpty =>
423+
Some(spec.copy(partitionColumns = spec.partitionColumns.asNullable))
424+
case _ =>
425+
None
426+
}
427+
.orElse {
428+
// We only know the partition columns and their data types. We need to discover
429+
// partition values.
430+
userDefinedPartitionColumns.map { partitionSchema =>
431+
val spec = discoverPartitions()
432+
val castedPartitions = spec.partitions.map { case p @ Partition(values, path) =>
433+
val literals = values.toSeq.zip(spec.partitionColumns.map(_.dataType)).map {
434+
case (value, dataType) => Literal.create(value, dataType)
435+
}
436+
val castedValues = partitionSchema.zip(literals).map { case (field, literal) =>
437+
Cast(literal, field.dataType).eval()
438+
}
439+
p.copy(values = Row.fromSeq(castedValues))
440+
}
441+
PartitionSpec(partitionSchema, castedPartitions)
442+
}
443+
}
421444
.getOrElse {
422445
if (sqlContext.conf.partitionDiscoveryEnabled()) {
423446
discoverPartitions()

sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,11 @@ trait SQLTestUtils {
7878
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
7979
try f finally sqlContext.dropTempTable(tableName)
8080
}
81+
82+
/**
83+
* Drops table `tableName` after calling `f`.
84+
*/
85+
protected def withTable(tableName: String)(f: => Unit): Unit = {
86+
try f finally sqlContext.sql(s"DROP TABLE IF EXISTS $tableName")
87+
}
8188
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,11 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
6666
def schemaStringFromParts: Option[String] = {
6767
table.properties.get("spark.sql.sources.schema.numParts").map { numParts =>
6868
val parts = (0 until numParts.toInt).map { index =>
69-
val part = table.properties.get(s"spark.sql.sources.schema.part.${index}").orNull
69+
val part = table.properties.get(s"spark.sql.sources.schema.part.$index").orNull
7070
if (part == null) {
7171
throw new AnalysisException(
72-
s"Could not read schema from the metastore because it is corrupted " +
73-
s"(missing part ${index} of the schema).")
72+
"Could not read schema from the metastore because it is corrupted " +
73+
s"(missing part $index of the schema, $numParts parts are expected).")
7474
}
7575

7676
part
@@ -89,6 +89,11 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
8989
val userSpecifiedSchema =
9090
schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType])
9191

92+
// We only need names at here since userSpecifiedSchema we loaded from the metastore
93+
// contains partition columns. We can always get datatypes of partitioning columns
94+
// from userSpecifiedSchema.
95+
val partitionColumns = table.partitionColumns.map(_.name)
96+
9297
// It does not appear that the ql client for the metastore has a way to enumerate all the
9398
// SerDe properties directly...
9499
val options = table.serdeProperties
@@ -97,7 +102,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
97102
ResolvedDataSource(
98103
hive,
99104
userSpecifiedSchema,
100-
Array.empty[String],
105+
partitionColumns.toArray,
101106
table.properties("spark.sql.sources.provider"),
102107
options)
103108

@@ -111,8 +116,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
111116
override def refreshTable(databaseName: String, tableName: String): Unit = {
112117
// refreshTable does not eagerly reload the cache. It just invalidate the cache.
113118
// Next time when we use the table, it will be populated in the cache.
114-
// Since we also cache ParquetRealtions converted from Hive Parquet tables and
115-
// adding converted ParquetRealtions into the cache is not defined in the load function
119+
// Since we also cache ParquetRelations converted from Hive Parquet tables and
120+
// adding converted ParquetRelations into the cache is not defined in the load function
116121
// of the cache (instead, we add the cache entry in convertToParquetRelation),
117122
// it is better at here to invalidate the cache to avoid confusing waring logs from the
118123
// cache loader (e.g. cannot find data source provider, which is only defined for
@@ -133,21 +138,47 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
133138
def createDataSourceTable(
134139
tableName: String,
135140
userSpecifiedSchema: Option[StructType],
141+
partitionColumns: Array[String],
136142
provider: String,
137143
options: Map[String, String],
138144
isExternal: Boolean): Unit = {
139145
val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
140146
val tableProperties = new scala.collection.mutable.HashMap[String, String]
141147
tableProperties.put("spark.sql.sources.provider", provider)
148+
149+
// Saves optional user specified schema. Serialized JSON schema string may be too long to be
150+
// stored into a single metastore SerDe property. In this case, we split the JSON string and
151+
// store each part as a separate SerDe property.
142152
if (userSpecifiedSchema.isDefined) {
143153
val threshold = conf.schemaStringLengthThreshold
144154
val schemaJsonString = userSpecifiedSchema.get.json
145155
// Split the JSON string.
146156
val parts = schemaJsonString.grouped(threshold).toSeq
147157
tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString)
148158
parts.zipWithIndex.foreach { case (part, index) =>
149-
tableProperties.put(s"spark.sql.sources.schema.part.${index}", part)
159+
tableProperties.put(s"spark.sql.sources.schema.part.$index", part)
160+
}
161+
}
162+
163+
val metastorePartitionColumns = userSpecifiedSchema.map { schema =>
164+
val fields = partitionColumns.map(col => schema(col))
165+
fields.map { field =>
166+
HiveColumn(
167+
name = field.name,
168+
hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType),
169+
comment = "")
170+
}.toSeq
171+
}.getOrElse {
172+
if (partitionColumns.length > 0) {
173+
// The table does not have a specified schema, which means that the schema will be inferred
174+
// when we load the table. So, we are not expecting partition columns and we will discover
175+
// partitions when we load the table. However, if there are specified partition columns,
176+
// we simplily ignore them and provide a warning message..
177+
logWarning(
178+
s"The schema and partitions of table $tableName will be inferred when it is loaded. " +
179+
s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
150180
}
181+
Seq.empty[HiveColumn]
151182
}
152183

153184
val tableType = if (isExternal) {
@@ -163,7 +194,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
163194
specifiedDatabase = Option(dbName),
164195
name = tblName,
165196
schema = Seq.empty,
166-
partitionColumns = Seq.empty,
197+
partitionColumns = metastorePartitionColumns,
167198
tableType = tableType,
168199
properties = tableProperties.toMap,
169200
serdeProperties = options))
@@ -199,7 +230,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
199230
val dataSourceTable =
200231
cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
201232
// Then, if alias is specified, wrap the table with a Subquery using the alias.
202-
// Othersie, wrap the table with a Subquery using the table name.
233+
// Otherwise, wrap the table with a Subquery using the table name.
203234
val withAlias =
204235
alias.map(a => Subquery(a, dataSourceTable)).getOrElse(
205236
Subquery(tableIdent.last, dataSourceTable))

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ case class CreateMetastoreDataSource(
146146
hiveContext.catalog.createDataSourceTable(
147147
tableName,
148148
userSpecifiedSchema,
149+
Array.empty[String],
149150
provider,
150151
optionsWithPath,
151152
isExternal)
@@ -244,6 +245,7 @@ case class CreateMetastoreDataSourceAsSelect(
244245
hiveContext.catalog.createDataSourceTable(
245246
tableName,
246247
Some(resolved.relation.schema),
248+
partitionColumns,
247249
provider,
248250
optionsWithPath,
249251
isExternal)

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,14 @@ private[sql] class DefaultSource extends HadoopFsRelationProvider {
4848
def createRelation(
4949
sqlContext: SQLContext,
5050
paths: Array[String],
51-
schema: Option[StructType],
51+
dataSchema: Option[StructType],
5252
partitionColumns: Option[StructType],
5353
parameters: Map[String, String]): HadoopFsRelation = {
5454
assert(
5555
sqlContext.isInstanceOf[HiveContext],
5656
"The ORC data source can only be used with HiveContext.")
5757

58-
val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty[Partition]))
59-
OrcRelation(paths, parameters, schema, partitionSpec)(sqlContext)
58+
new OrcRelation(paths, dataSchema, None, partitionColumns, parameters)(sqlContext)
6059
}
6160
}
6261

@@ -136,23 +135,35 @@ private[orc] class OrcOutputWriter(
136135
}
137136

138137
@DeveloperApi
139-
private[sql] case class OrcRelation(
138+
private[sql] class OrcRelation(
140139
override val paths: Array[String],
141-
parameters: Map[String, String],
142-
maybeSchema: Option[StructType] = None,
143-
maybePartitionSpec: Option[PartitionSpec] = None)(
140+
maybeDataSchema: Option[StructType],
141+
maybePartitionSpec: Option[PartitionSpec],
142+
override val userDefinedPartitionColumns: Option[StructType],
143+
parameters: Map[String, String])(
144144
@transient val sqlContext: SQLContext)
145145
extends HadoopFsRelation(maybePartitionSpec)
146146
with Logging {
147147

148-
override val dataSchema: StructType = maybeSchema.getOrElse {
148+
private[sql] def this(
149+
paths: Array[String],
150+
maybeDataSchema: Option[StructType],
151+
maybePartitionSpec: Option[PartitionSpec],
152+
parameters: Map[String, String])(
153+
sqlContext: SQLContext) = {
154+
this(
155+
paths,
156+
maybeDataSchema,
157+
maybePartitionSpec,
158+
maybePartitionSpec.map(_.partitionColumns),
159+
parameters)(sqlContext)
160+
}
161+
162+
override val dataSchema: StructType = maybeDataSchema.getOrElse {
149163
OrcFileOperator.readSchema(
150164
paths.head, Some(sqlContext.sparkContext.hadoopConfiguration))
151165
}
152166

153-
override def userDefinedPartitionColumns: Option[StructType] =
154-
maybePartitionSpec.map(_.partitionColumns)
155-
156167
override def needConversion: Boolean = false
157168

158169
override def equals(other: Any): Boolean = other match {
@@ -169,7 +180,7 @@ private[sql] case class OrcRelation(
169180
paths.toSet,
170181
dataSchema,
171182
schema,
172-
maybePartitionSpec)
183+
partitionColumns)
173184
}
174185

175186
override def buildScan(

0 commit comments

Comments
 (0)