Skip to content

Commit c956201

Browse files
committed
Revert "[SPARK-19411][SQL] Remove the metadata used to mark optional columns in merged Parquet schema for filter predicate pushdown"
This reverts commit bf49368.
1 parent cf461d0 commit c956201

File tree

6 files changed

+137
-22
lines changed

6 files changed

+137
-22
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -582,9 +582,9 @@ class Analyzer(
582582
// |- view2 (defaultDatabase = db2)
583583
// |- view3 (defaultDatabase = db3)
584584
// |- view4 (defaultDatabase = db4)
585-
// In this case, the view `view1` is a nested view, it directly references `table2`, `view2`
585+
// In this case, the view `view1` is a nested view, it directly references `table2``view2`
586586
// and `view4`, the view `view2` references `view3`. On resolving the table, we look up the
587-
// relations `table2`, `view2`, `view4` using the default database `db1`, and look up the
587+
// relations `table2``view2``view4` using the default database `db1`, and look up the
588588
// relation `view3` using the default database `db2`.
589589
//
590590
// Note this is compatible with the views defined by older versions of Spark(before 2.2), which

sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,13 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
402402
@InterfaceStability.Stable
403403
object StructType extends AbstractDataType {
404404

405+
/**
406+
* A key used in field metadata to indicate that the field comes from the result of merging
407+
* two different StructTypes that do not always contain the field. That is to say, the field
408+
* might be missing (optional) from one of the StructTypes.
409+
*/
410+
private[sql] val metadataKeyForOptionalField = "_OPTIONAL_"
411+
405412
override private[sql] def defaultConcreteType: DataType = new StructType
406413

407414
override private[sql] def acceptsType(other: DataType): Boolean = {
@@ -462,6 +469,8 @@ object StructType extends AbstractDataType {
462469

463470
case (StructType(leftFields), StructType(rightFields)) =>
464471
val newFields = ArrayBuffer.empty[StructField]
472+
// This metadata will record the fields that only exist in one of two StructTypes
473+
val optionalMeta = new MetadataBuilder()
465474

466475
val rightMapped = fieldsMap(rightFields)
467476
leftFields.foreach {
@@ -473,7 +482,8 @@ object StructType extends AbstractDataType {
473482
nullable = leftNullable || rightNullable)
474483
}
475484
.orElse {
476-
Some(leftField)
485+
optionalMeta.putBoolean(metadataKeyForOptionalField, value = true)
486+
Some(leftField.copy(metadata = optionalMeta.build()))
477487
}
478488
.foreach(newFields += _)
479489
}
@@ -482,7 +492,8 @@ object StructType extends AbstractDataType {
482492
rightFields
483493
.filterNot(f => leftMapped.get(f.name).nonEmpty)
484494
.foreach { f =>
485-
newFields += f
495+
optionalMeta.putBoolean(metadataKeyForOptionalField, value = true)
496+
newFields += f.copy(metadata = optionalMeta.build())
486497
}
487498

488499
StructType(newFields)

sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,55 @@ class DataTypeSuite extends SparkFunSuite {
134134
assert(mapped === expected)
135135
}
136136

137+
test("merge where right is empty") {
138+
val left = StructType(
139+
StructField("a", LongType) ::
140+
StructField("b", FloatType) :: Nil)
141+
142+
val right = StructType(List())
143+
val merged = left.merge(right)
144+
145+
assert(DataType.equalsIgnoreCompatibleNullability(merged, left))
146+
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
147+
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
148+
}
149+
150+
test("merge where left is empty") {
151+
152+
val left = StructType(List())
153+
154+
val right = StructType(
155+
StructField("a", LongType) ::
156+
StructField("b", FloatType) :: Nil)
157+
158+
val merged = left.merge(right)
159+
160+
assert(DataType.equalsIgnoreCompatibleNullability(merged, right))
161+
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
162+
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
163+
}
164+
165+
test("merge where both are non-empty") {
166+
val left = StructType(
167+
StructField("a", LongType) ::
168+
StructField("b", FloatType) :: Nil)
169+
170+
val right = StructType(
171+
StructField("c", LongType) :: Nil)
172+
173+
val expected = StructType(
174+
StructField("a", LongType) ::
175+
StructField("b", FloatType) ::
176+
StructField("c", LongType) :: Nil)
177+
178+
val merged = left.merge(right)
179+
180+
assert(DataType.equalsIgnoreCompatibleNullability(merged, expected))
181+
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
182+
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
183+
assert(merged("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
184+
}
185+
137186
test("merge where right contains type conflict") {
138187
val left = StructType(
139188
StructField("a", LongType) ::

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ class ParquetFileFormat
109109

110110
// We want to clear this temporary metadata from saving into Parquet file.
111111
// This metadata is only useful for detecting optional columns when pushdowning filters.
112-
ParquetWriteSupport.setSchema(dataSchema, conf)
112+
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
113+
dataSchema).asInstanceOf[StructType]
114+
ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)
113115

114116
// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
115117
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
@@ -295,7 +297,11 @@ class ParquetFileFormat
295297
ParquetWriteSupport.SPARK_ROW_SCHEMA,
296298
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
297299

298-
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
300+
// We want to clear this temporary metadata from saving into Parquet file.
301+
// This metadata is only useful for detecting optional columns when pushdowning filters.
302+
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
303+
requiredSchema).asInstanceOf[StructType]
304+
ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
299305

300306
// Sets flags for `CatalystSchemaConverter`
301307
hadoopConf.setBoolean(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,23 @@ private[parquet] object ParquetFilters {
169169
}
170170

171171
/**
172-
* Returns a map from name of the column to the data type, if predicate push down applies.
172+
* Returns a map from name of the column to the data type, if predicate push down applies
173+
* (i.e. not an optional field).
174+
*
175+
* SPARK-11955: The optional fields will have metadata StructType.metadataKeyForOptionalField.
176+
* These fields only exist in one side of merged schemas. Due to that, we can't push down filters
177+
* using such fields, otherwise Parquet library will throw exception (PARQUET-389).
178+
* Here we filter out such fields.
173179
*/
174180
private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match {
175181
case StructType(fields) =>
176182
// Here we don't flatten the fields in the nested schema but just look up through
177183
// root fields. Currently, accessing to nested fields does not push down filters
178184
// and it does not support to create filters for them.
179-
fields.map(f => f.name -> f.dataType).toMap
185+
fields.filter { f =>
186+
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
187+
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
188+
}.map(f => f.name -> f.dataType).toMap
180189
case _ => Map.empty[String, DataType]
181190
}
182191

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -368,36 +368,76 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
368368
}
369369

370370

371-
test("Filter applied on merged Parquet schema with new column should work") {
371+
test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
372372
import testImplicits._
373373
Seq("true", "false").map { vectorized =>
374374
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
375375
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
376376
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
377377
withTempPath { dir =>
378-
val path1 = s"${dir.getCanonicalPath}/table1"
379-
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path1)
380-
val path2 = s"${dir.getCanonicalPath}/table2"
381-
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(path2)
382-
383-
// No matter "c = 1" gets pushed down or not, this query should work without exception.
384-
val df = spark.read.parquet(path1, path2).filter("c = 1").selectExpr("c", "b", "a")
378+
val pathOne = s"${dir.getCanonicalPath}/table1"
379+
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
380+
val pathTwo = s"${dir.getCanonicalPath}/table2"
381+
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
382+
383+
// If the "c = 1" filter gets pushed down, this query will throw an exception which
384+
// Parquet emits. This is a Parquet issue (PARQUET-389).
385+
val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
385386
checkAnswer(
386387
df,
387388
Row(1, "1", null))
388389

389-
val path3 = s"${dir.getCanonicalPath}/table3"
390+
// The fields "a" and "c" only exist in one Parquet file.
391+
assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
392+
assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
393+
394+
val pathThree = s"${dir.getCanonicalPath}/table3"
395+
df.write.parquet(pathThree)
396+
397+
// We will remove the temporary metadata when writing Parquet file.
398+
val schema = spark.read.parquet(pathThree).schema
399+
assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
400+
401+
val pathFour = s"${dir.getCanonicalPath}/table4"
390402
val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
391-
dfStruct.select(struct("a").as("s")).write.parquet(path3)
403+
dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
392404

393-
val path4 = s"${dir.getCanonicalPath}/table4"
405+
val pathFive = s"${dir.getCanonicalPath}/table5"
394406
val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
395-
dfStruct2.select(struct("c").as("s")).write.parquet(path4)
407+
dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
396408

397-
// No matter "s.c = 1" gets pushed down or not, this query should work without exception.
398-
val dfStruct3 = spark.read.parquet(path3, path4).filter("s.c = 1")
409+
// If the "s.c = 1" filter gets pushed down, this query will throw an exception which
410+
// Parquet emits.
411+
val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
399412
.selectExpr("s")
400413
checkAnswer(dfStruct3, Row(Row(null, 1)))
414+
415+
// The fields "s.a" and "s.c" only exist in one Parquet file.
416+
val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
417+
assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
418+
assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
419+
420+
val pathSix = s"${dir.getCanonicalPath}/table6"
421+
dfStruct3.write.parquet(pathSix)
422+
423+
// We will remove the temporary metadata when writing Parquet file.
424+
val forPathSix = spark.read.parquet(pathSix).schema
425+
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
426+
427+
// sanity test: make sure optional metadata field is not wrongly set.
428+
val pathSeven = s"${dir.getCanonicalPath}/table7"
429+
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
430+
val pathEight = s"${dir.getCanonicalPath}/table8"
431+
(4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)
432+
433+
val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
434+
checkAnswer(
435+
df2,
436+
Row(1, "1"))
437+
438+
// The fields "a" and "b" exist in both two Parquet files. No metadata is set.
439+
assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
440+
assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
401441
}
402442
}
403443
}

0 commit comments

Comments
 (0)