Skip to content

Commit f9fab59

Browse files
committed
Fix the possible inconsistency between file schema and catalog schema for file-based data source relation.
1 parent daace60 commit f9fab59

File tree

6 files changed

+253
-9
lines changed

6 files changed

+253
-9
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
21+
import org.apache.spark.sql.types.{StructField, StructType}
22+
23+
/**
24+
* An interface for mapping two different schemas. For the relations that have are backed by files,
25+
* the inferred schema from the files might be different with the schema stored in the catalog. In
26+
* such case, the interface helps mapping inconsistent schemas.
27+
*/
28+
private[sql] trait SchemaMapping {
29+
/** The schema inferred from the files. */
30+
val dataSchema: StructType
31+
32+
/** The schema used in partition. */
33+
val partitionSchema: StructType
34+
35+
/** The schema fetched from the catalog. */
36+
val catalogSchema: StructType
37+
38+
require(catalogSchema.length == 0 ||
39+
dataSchema.length + partitionSchema.length == catalogSchema.length,
40+
s"The data schema in files: $dataSchema plus the partition schema: $partitionSchema " +
41+
s"should have the same number of fields with the schema in catalog: $catalogSchema.")
42+
43+
/** Returns the correspond catalog field for the given data field. */
44+
def lookForFieldFromDataField(field: StructField): Option[StructField] = {
45+
if (catalogSchema.fields.length == 0) {
46+
None
47+
} else {
48+
dataSchema.getFieldIndex(field.name).map { idx =>
49+
catalogSchema.fields(idx)
50+
}
51+
}
52+
}
53+
54+
/** Returns the correspond data field for the given catalog field. */
55+
def lookForFieldFromCatalogField(field: StructField): Option[StructField] = {
56+
catalogSchema.getFieldIndex(field.name).map { idx =>
57+
dataSchema.fields(idx)
58+
}
59+
}
60+
61+
/** Returns the correspond data field for the given catalog field. */
62+
def lookForFieldFromCatalogField(fieldName: String): Option[StructField] = {
63+
catalogSchema.getFieldIndex(fieldName).map { idx =>
64+
dataSchema.fields(idx)
65+
}
66+
}
67+
68+
/**
69+
* Transforms the attributes in the given expression which is based on the catalog schema
70+
* to corresponding attributes in the schema in the files.
71+
*/
72+
def transformExpressionToUseDataSchema(expr: Expression): Expression = {
73+
expr transform {
74+
case a: AttributeReference =>
75+
lookForFieldFromCatalogField(a.name).map { field =>
76+
a.withName(field.name)
77+
}.getOrElse(a)
78+
}
79+
}
80+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3131
import org.apache.spark.sql.execution.DataSourceScanExec
3232
import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS}
3333
import org.apache.spark.sql.execution.SparkPlan
34+
import org.apache.spark.sql.types.StructType
3435

3536
/**
3637
* A strategy for planning scans over collections of files that might be partitioned or bucketed
@@ -85,11 +86,22 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
8586
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
8687
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
8788

88-
val dataColumns =
89+
// Transform data schema to the schema in catalog if any.
90+
val relationSchema = StructType(fsRelation.dataSchema.flatMap { field =>
91+
fsRelation.lookForFieldFromDataField(field)
92+
})
93+
94+
val dataColumns = if (relationSchema.length != fsRelation.dataSchema.length) {
8995
l.resolve(fsRelation.dataSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
96+
} else {
97+
l.resolve(relationSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
98+
}
9099

91100
// Partition keys are not available in the statistics of the files.
101+
// Data filters are based on the schema stored in files which might be different with the
102+
// relation's output schema. We need to transform the filters.
92103
val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
104+
.map (filter => fsRelation.transformExpressionToUseDataSchema(filter))
93105

94106
// Predicates with both partition keys and attributes need to be evaluated after the scan.
95107
val afterScanFilters = filterSet -- partitionKeyFilters
@@ -105,7 +117,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
105117
dataColumns
106118
.filter(requiredAttributes.contains)
107119
.filterNot(partitionColumns.contains)
108-
val prunedDataSchema = readDataColumns.toStructType
120+
val prunedDataSchema = StructType(readDataColumns.toStructType.map { field =>
121+
fsRelation.lookForFieldFromCatalogField(field).getOrElse(field)
122+
})
109123
logInfo(s"Pruned Data Schema: ${prunedDataSchema.simpleString(5)}")
110124

111125
val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ import org.apache.spark.sql._
3232
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
3333
import org.apache.spark.sql.catalyst.expressions._
3434
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
35-
import org.apache.spark.sql.execution.FileRelation
35+
import org.apache.spark.sql.execution.{FileRelation, SchemaMapping}
3636
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter}
37-
import org.apache.spark.sql.types.StructType
37+
import org.apache.spark.sql.types.{StructField, StructType}
3838
import org.apache.spark.util.SerializableConfiguration
3939

4040
/**
@@ -132,6 +132,7 @@ abstract class OutputWriter {
132132
* @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values).
133133
* @param fileFormat A file format that can be used to read and write the data in files.
134134
* @param options Configuration used when reading / writing data.
135+
* @param catalogSchema The schema fetched from the catalog such as Metastore if any.
135136
*/
136137
case class HadoopFsRelation(
137138
sparkSession: SparkSession,
@@ -140,14 +141,24 @@ case class HadoopFsRelation(
140141
dataSchema: StructType,
141142
bucketSpec: Option[BucketSpec],
142143
fileFormat: FileFormat,
143-
options: Map[String, String]) extends BaseRelation with FileRelation {
144+
options: Map[String, String],
145+
catalogSchema: StructType = new StructType())
146+
extends BaseRelation with FileRelation with SchemaMapping {
144147

145148
override def sqlContext: SQLContext = sparkSession.sqlContext
146149

147150
val schema: StructType = {
148-
val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
149-
StructType(dataSchema ++ partitionSchema.filterNot { column =>
150-
dataSchemaColumnNames.contains(column.name.toLowerCase)
151+
// If there is given catalog schema, we should use it as relation output instead of the schema
152+
// inferred from the files.
153+
val schemaColumns = if (catalogSchema.fields.length == 0) {
154+
dataSchema
155+
} else {
156+
catalogSchema
157+
}
158+
159+
val schemaColumnNames = schemaColumns.map(_.name.toLowerCase).toSet
160+
StructType(schemaColumns ++ partitionSchema.filterNot { column =>
161+
schemaColumnNames.contains(column.name.toLowerCase)
151162
})
152163
}
153164

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.{SparkException, SparkFunSuite}
21+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
22+
import org.apache.spark.sql.types._
23+
24+
case class SchemaMappingRelation(
25+
val dataSchema: StructType,
26+
val partitionSchema: StructType,
27+
val catalogSchema: StructType) extends SchemaMapping
28+
29+
class SchemaMappingSuite extends SparkFunSuite {
30+
31+
val dataSchema = StructType(
32+
StructField("_col1", IntegerType) ::
33+
StructField("_col2", LongType) ::
34+
StructField("_col3", BooleanType) :: Nil)
35+
36+
val partitionSchema = StructType(
37+
StructField("part", IntegerType) :: Nil)
38+
39+
val catalogSchema = StructType(
40+
StructField("f1", IntegerType) ::
41+
StructField("f2", LongType) ::
42+
StructField("f3", BooleanType) ::
43+
StructField("part", IntegerType) :: Nil)
44+
45+
val relation = SchemaMappingRelation(dataSchema, partitionSchema, catalogSchema)
46+
47+
test("looking for data schema field with given catalog field name") {
48+
val col1 = relation.lookForFieldFromCatalogField("f1").get
49+
assert(col1.name == "_col1" && col1.dataType == IntegerType)
50+
51+
val col2 = relation.lookForFieldFromCatalogField("f2").get
52+
assert(col2.name == "_col2" && col2.dataType == LongType)
53+
54+
val col3 = relation.lookForFieldFromCatalogField("f3").get
55+
assert(col3.name == "_col3" && col3.dataType == BooleanType)
56+
57+
assert(relation.lookForFieldFromCatalogField("f4").isEmpty)
58+
}
59+
60+
test("relation with empty catalog schema") {
61+
val relationWithoutCatalogSchema = SchemaMappingRelation(dataSchema,
62+
partitionSchema, new StructType())
63+
assert(relationWithoutCatalogSchema.lookForFieldFromCatalogField("f1").isEmpty)
64+
}
65+
66+
test("data schema must match catalog schema in length if catalog schema is not empty") {
67+
val catalogSchema = StructType(StructField("f1", IntegerType) :: Nil)
68+
val e = intercept[RuntimeException] {
69+
SchemaMappingRelation(dataSchema, partitionSchema, catalogSchema)
70+
}
71+
assert(e.getMessage.contains("should have the same number of fields"))
72+
}
73+
74+
test("transform expression of catalog schema fields to use data schema fields") {
75+
val attr = AttributeReference("f1", IntegerType)()
76+
val expr = EqualTo(attr, Literal(1))
77+
val expected = EqualTo(attr.withName("_col1"), Literal(1))
78+
assert(relation.transformExpressionToUseDataSchema(expr) == expected)
79+
}
80+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
246246
fileFormatClass: Class[_ <: FileFormat],
247247
fileType: String): LogicalRelation = {
248248
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
249+
249250
val tableIdentifier =
250251
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
251252
val bucketSpec = None // We don't support hive bucketed tables, only ones we write out.
@@ -308,7 +309,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
308309
dataSchema = inferredSchema,
309310
bucketSpec = bucketSpec,
310311
fileFormat = defaultSource,
311-
options = options)
312+
options = options,
313+
catalogSchema = metastoreSchema)
312314

313315
val created = LogicalRelation(
314316
relation,

sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,4 +497,61 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
497497
assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
498498
}
499499
}
500+
501+
test("ORC conversion when metastore schema does not match schema stored in ORC files") {
502+
withTempView("single") {
503+
val singleRowDF = Seq((0, "foo")).toDF("key", "value")
504+
singleRowDF.createOrReplaceTempView("single")
505+
506+
withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true") {
507+
withTable("dummy_orc") {
508+
withTempPath { dir =>
509+
val path = dir.getCanonicalPath
510+
511+
// Create a Metastore ORC table and insert data into it.
512+
spark.sql(
513+
s"""
514+
|CREATE TABLE dummy_orc(value STRING)
515+
|PARTITIONED BY (key INT)
516+
|STORED AS ORC
517+
|LOCATION '$path'
518+
""".stripMargin)
519+
520+
spark.sql(
521+
s"""
522+
|INSERT INTO TABLE dummy_orc
523+
|PARTITION(key=0)
524+
|SELECT value FROM single
525+
""".stripMargin)
526+
527+
val df = spark.sql("SELECT key, value FROM dummy_orc WHERE key=0")
528+
checkAnswer(df, singleRowDF)
529+
530+
// Create a Metastore ORC table with different schema.
531+
spark.sql(
532+
s"""
533+
|CREATE EXTERNAL TABLE dummy_orc2(value2 STRING)
534+
|PARTITIONED BY (key INT)
535+
|STORED AS ORC
536+
|LOCATION '$path'
537+
""".stripMargin)
538+
539+
spark.sql("ALTER TABLE dummy_orc2 ADD PARTITION(key=0)")
540+
541+
// The output of the relation is the schema from the Metastore, not the file.
542+
val df2 = spark.sql("SELECT key, value2 FROM dummy_orc2 WHERE key=0 AND value2='foo'")
543+
checkAnswer(df2, singleRowDF)
544+
545+
val queryExecution = df2.queryExecution
546+
queryExecution.analyzed.collectFirst {
547+
case _: LogicalRelation => ()
548+
}.getOrElse {
549+
fail(s"Expecting the query plan to convert orc to data sources, " +
550+
s"but got:\n$queryExecution")
551+
}
552+
}
553+
}
554+
}
555+
}
556+
}
500557
}

0 commit comments

Comments
 (0)