Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -758,12 +758,14 @@ private[sql] object ParquetRelation2 extends Logging {
|${parquetSchema.prettyJson}
""".stripMargin

assert(metastoreSchema.size == parquetSchema.size, schemaConflictMessage)
val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)

assert(metastoreSchema.size == mergedParquetSchema.size, schemaConflictMessage)

val ordinalMap = metastoreSchema.zipWithIndex.map {
case (field, index) => field.name.toLowerCase -> index
}.toMap
val reorderedParquetSchema = parquetSchema.sortBy(f => ordinalMap(f.name.toLowerCase))
val reorderedParquetSchema = mergedParquetSchema.sortBy(f => ordinalMap(f.name.toLowerCase))

StructType(metastoreSchema.zip(reorderedParquetSchema).map {
// Uses Parquet field names but retains Metastore data types.
Expand All @@ -774,6 +776,32 @@ private[sql] object ParquetRelation2 extends Logging {
})
}

/**
* Returns the original schema from the Parquet file with any missing nullable fields from the
* Hive Metastore schema merged in.
*
* When constructing a DataFrame from a collection of structured data, the resulting object has
* a schema corresponding to the union of the fields present in each element of the collection.
* Spark SQL simply assigns a null value to any field that isn't present for a particular row.
* In some cases, it is possible that a given table partition stored as a Parquet file doesn't
* contain a particular nullable field in its schema despite that field being present in the
* table schema obtained from the Hive Metastore. This method returns a schema representing the
* Parquet file schema along with any additional nullable fields from the Metastore schema
* merged in.
*/
private[parquet] def mergeMissingNullableFields(
metastoreSchema: StructType,
parquetSchema: StructType): StructType = {
val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
val missingFields = metastoreSchema
.map(_.name.toLowerCase)
.diff(parquetSchema.map(_.name.toLowerCase))
.map(fieldMap(_))
.filter(_.nullable)
StructType(parquetSchema ++ missingFields)
}


// TODO Data source implementations shouldn't touch Catalyst types (`Literal`).
// However, we are already using Catalyst expressions for partition pruning and predicate
// push-down here...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructField("UPPERCase", IntegerType, nullable = true))))
}

// Conflicting field count
// Conflicting field count due to additional fields in Parquet schema
assert(intercept[Throwable] {
ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
Expand All @@ -230,4 +230,50 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructType(Seq(StructField("lowerCase", BinaryType))))
}
}

test("merge missing nullable fields from Metastore schema") {
// Standard case: Metastore schema contains additional nullable fields not present
// in the Parquet file schema.
assertResult(
StructType(Seq(
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true)))) {
ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true))),
StructType(Seq(
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true))))
}

// Merge should fail if the Metastore contains any additional fields that are not
// nullable.
assert(intercept[Throwable] {
ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = false))),
StructType(Seq(
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true))))
}.getMessage.contains("detected conflicting schemas"))

// Merge should fail if the Parquet file schema contains fields not present in the
// Metastore schema.
assert(intercept[Throwable] {
ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true))),
StructType(Seq(
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true),
StructField("fourthField", StringType, nullable = true))))
}.getMessage.contains("detected conflicting schemas"))
}
}