Skip to content

Commit 5e04f45

Browse files
Adam Buddeliancheng
authored andcommitted
[SPARK-6538][SQL] Add missing nullable Metastore fields when merging a Parquet schema
Opening to replace #5188. When Spark SQL infers a schema for a DataFrame, it will take the union of all field types present in the structured source data (e.g. an RDD of JSON data). When the source data for a row doesn't define a particular field on the DataFrame's schema, a null value will simply be assumed for this field. This workflow makes it very easy to construct tables and query over a set of structured data with a nonuniform schema. However, this behavior is not consistent in some cases when dealing with Parquet files and an external table managed by an external Hive metastore. In our particular usecase, we use Spark Streaming to parse and transform our input data and then apply a window function to save an arbitrary-sized batch of data as a Parquet file, which itself will be added as a partition to an external Hive table via an *"ALTER TABLE... ADD PARTITION..."* statement. Since our input data is nonuniform, it is expected that not every partition batch will contain every field present in the table's schema obtained from the Hive metastore. As such, we expect that the schema of some of our Parquet files may not contain the same set fields present in the full metastore schema. In such cases, it seems natural that Spark SQL would simply assume null values for any missing fields in the partition's Parquet file, assuming these fields are specified as nullable by the metastore schema. This is not the case in the current implementation of ParquetRelation2. The **mergeMetastoreParquetSchema()** method used to reconcile differences between a Parquet file's schema and a schema retrieved from the Hive metastore will raise an exception if the Parquet file doesn't match the same set of fields specified by the metastore. This pull requests alters the behavior of **mergeMetastoreParquetSchema()** by having it first add any nullable fields from the metastore schema to the Parquet file schema if they aren't already present there. Author: Adam Budde <[email protected]> Closes #5214 from budde/nullable-fields and squashes the following commits: a52d378 [Adam Budde] Refactor ParquetSchemaSuite.scala for cases now permitted by SPARK-6471 and SPARK-6538 9041bfa [Adam Budde] Add missing nullable Metastore fields when merging a Parquet schema (cherry picked from commit 5909f09) Signed-off-by: Cheng Lian <[email protected]>
1 parent 7006858 commit 5e04f45

File tree

2 files changed

+66
-6
lines changed

2 files changed

+66
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -765,12 +765,14 @@ private[sql] object ParquetRelation2 extends Logging {
765765
|${parquetSchema.prettyJson}
766766
""".stripMargin
767767

768-
assert(metastoreSchema.size <= parquetSchema.size, schemaConflictMessage)
768+
val mergedParquetSchema = mergeMissingNullableFields(metastoreSchema, parquetSchema)
769+
770+
assert(metastoreSchema.size <= mergedParquetSchema.size, schemaConflictMessage)
769771

770772
val ordinalMap = metastoreSchema.zipWithIndex.map {
771773
case (field, index) => field.name.toLowerCase -> index
772774
}.toMap
773-
val reorderedParquetSchema = parquetSchema.sortBy(f =>
775+
val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
774776
ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))
775777

776778
StructType(metastoreSchema.zip(reorderedParquetSchema).map {
@@ -782,6 +784,32 @@ private[sql] object ParquetRelation2 extends Logging {
782784
})
783785
}
784786

787+
/**
788+
* Returns the original schema from the Parquet file with any missing nullable fields from the
789+
* Hive Metastore schema merged in.
790+
*
791+
* When constructing a DataFrame from a collection of structured data, the resulting object has
792+
* a schema corresponding to the union of the fields present in each element of the collection.
793+
* Spark SQL simply assigns a null value to any field that isn't present for a particular row.
794+
* In some cases, it is possible that a given table partition stored as a Parquet file doesn't
795+
* contain a particular nullable field in its schema despite that field being present in the
796+
* table schema obtained from the Hive Metastore. This method returns a schema representing the
797+
* Parquet file schema along with any additional nullable fields from the Metastore schema
798+
* merged in.
799+
*/
800+
private[parquet] def mergeMissingNullableFields(
801+
metastoreSchema: StructType,
802+
parquetSchema: StructType): StructType = {
803+
val fieldMap = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
804+
val missingFields = metastoreSchema
805+
.map(_.name.toLowerCase)
806+
.diff(parquetSchema.map(_.name.toLowerCase))
807+
.map(fieldMap(_))
808+
.filter(_.nullable)
809+
StructType(parquetSchema ++ missingFields)
810+
}
811+
812+
785813
// TODO Data source implementations shouldn't touch Catalyst types (`Literal`).
786814
// However, we are already using Catalyst expressions for partition pruning and predicate
787815
// push-down here...

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,22 +226,54 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
226226
StructField("UPPERCase", IntegerType, nullable = true))))
227227
}
228228

229-
// Conflicting field count
229+
// Metastore schema contains additional non-nullable fields.
230230
assert(intercept[Throwable] {
231231
ParquetRelation2.mergeMetastoreParquetSchema(
232232
StructType(Seq(
233233
StructField("uppercase", DoubleType, nullable = false),
234-
StructField("lowerCase", BinaryType))),
234+
StructField("lowerCase", BinaryType, nullable = false))),
235235

236236
StructType(Seq(
237237
StructField("UPPERCase", IntegerType, nullable = true))))
238238
}.getMessage.contains("detected conflicting schemas"))
239239

240-
// Conflicting field names
240+
// Conflicting non-nullable field names
241241
intercept[Throwable] {
242242
ParquetRelation2.mergeMetastoreParquetSchema(
243-
StructType(Seq(StructField("lower", StringType))),
243+
StructType(Seq(StructField("lower", StringType, nullable = false))),
244244
StructType(Seq(StructField("lowerCase", BinaryType))))
245245
}
246246
}
247+
248+
test("merge missing nullable fields from Metastore schema") {
249+
// Standard case: Metastore schema contains additional nullable fields not present
250+
// in the Parquet file schema.
251+
assertResult(
252+
StructType(Seq(
253+
StructField("firstField", StringType, nullable = true),
254+
StructField("secondField", StringType, nullable = true),
255+
StructField("thirdfield", StringType, nullable = true)))) {
256+
ParquetRelation2.mergeMetastoreParquetSchema(
257+
StructType(Seq(
258+
StructField("firstfield", StringType, nullable = true),
259+
StructField("secondfield", StringType, nullable = true),
260+
StructField("thirdfield", StringType, nullable = true))),
261+
StructType(Seq(
262+
StructField("firstField", StringType, nullable = true),
263+
StructField("secondField", StringType, nullable = true))))
264+
}
265+
266+
// Merge should fail if the Metastore contains any additional fields that are not
267+
// nullable.
268+
assert(intercept[Throwable] {
269+
ParquetRelation2.mergeMetastoreParquetSchema(
270+
StructType(Seq(
271+
StructField("firstfield", StringType, nullable = true),
272+
StructField("secondfield", StringType, nullable = true),
273+
StructField("thirdfield", StringType, nullable = false))),
274+
StructType(Seq(
275+
StructField("firstField", StringType, nullable = true),
276+
StructField("secondField", StringType, nullable = true))))
277+
}.getMessage.contains("detected conflicting schemas"))
278+
}
247279
}

0 commit comments

Comments
 (0)