Skip to content

Commit d5ca09d

Browse files
viiryacmonkey
authored andcommitted
[SPARK-19411][SQL] Remove the metadata used to mark optional columns in merged Parquet schema for filter predicate pushdown
## What changes were proposed in this pull request? There is a metadata introduced before to mark the optional columns in merged Parquet schema for filter predicate pushdown. As we upgrade to Parquet 1.8.2 which includes the fix for the pushdown of optional columns, we don't need this metadata now. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes apache#16756 from viirya/remove-optional-metadata.
1 parent 4e540b8 commit d5ca09d

File tree

6 files changed

+22
-137
lines changed

6 files changed

+22
-137
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -575,9 +575,9 @@ class Analyzer(
575575
// |- view2 (defaultDatabase = db2)
576576
// |- view3 (defaultDatabase = db3)
577577
// |- view4 (defaultDatabase = db4)
578-
// In this case, the view `view1` is a nested view, it directly references `table2``view2`
578+
// In this case, the view `view1` is a nested view, it directly references `table2`, `view2`
579579
// and `view4`, the view `view2` references `view3`. On resolving the table, we look up the
580-
// relations `table2``view2``view4` using the default database `db1`, and look up the
580+
// relations `table2`, `view2`, `view4` using the default database `db1`, and look up the
581581
// relation `view3` using the default database `db2`.
582582
//
583583
// Note this is compatible with the views defined by older versions of Spark(before 2.2), which

sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -402,13 +402,6 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
402402
@InterfaceStability.Stable
403403
object StructType extends AbstractDataType {
404404

405-
/**
406-
* A key used in field metadata to indicate that the field comes from the result of merging
407-
* two different StructTypes that do not always contain the field. That is to say, the field
408-
* might be missing (optional) from one of the StructTypes.
409-
*/
410-
private[sql] val metadataKeyForOptionalField = "_OPTIONAL_"
411-
412405
override private[sql] def defaultConcreteType: DataType = new StructType
413406

414407
override private[sql] def acceptsType(other: DataType): Boolean = {
@@ -463,8 +456,6 @@ object StructType extends AbstractDataType {
463456

464457
case (StructType(leftFields), StructType(rightFields)) =>
465458
val newFields = ArrayBuffer.empty[StructField]
466-
// This metadata will record the fields that only exist in one of two StructTypes
467-
val optionalMeta = new MetadataBuilder()
468459

469460
val rightMapped = fieldsMap(rightFields)
470461
leftFields.foreach {
@@ -476,8 +467,7 @@ object StructType extends AbstractDataType {
476467
nullable = leftNullable || rightNullable)
477468
}
478469
.orElse {
479-
optionalMeta.putBoolean(metadataKeyForOptionalField, value = true)
480-
Some(leftField.copy(metadata = optionalMeta.build()))
470+
Some(leftField)
481471
}
482472
.foreach(newFields += _)
483473
}
@@ -486,8 +476,7 @@ object StructType extends AbstractDataType {
486476
rightFields
487477
.filterNot(f => leftMapped.get(f.name).nonEmpty)
488478
.foreach { f =>
489-
optionalMeta.putBoolean(metadataKeyForOptionalField, value = true)
490-
newFields += f.copy(metadata = optionalMeta.build())
479+
newFields += f
491480
}
492481

493482
StructType(newFields)

sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -132,55 +132,6 @@ class DataTypeSuite extends SparkFunSuite {
132132
assert(mapped === expected)
133133
}
134134

135-
test("merge where right is empty") {
136-
val left = StructType(
137-
StructField("a", LongType) ::
138-
StructField("b", FloatType) :: Nil)
139-
140-
val right = StructType(List())
141-
val merged = left.merge(right)
142-
143-
assert(DataType.equalsIgnoreCompatibleNullability(merged, left))
144-
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
145-
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
146-
}
147-
148-
test("merge where left is empty") {
149-
150-
val left = StructType(List())
151-
152-
val right = StructType(
153-
StructField("a", LongType) ::
154-
StructField("b", FloatType) :: Nil)
155-
156-
val merged = left.merge(right)
157-
158-
assert(DataType.equalsIgnoreCompatibleNullability(merged, right))
159-
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
160-
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
161-
}
162-
163-
test("merge where both are non-empty") {
164-
val left = StructType(
165-
StructField("a", LongType) ::
166-
StructField("b", FloatType) :: Nil)
167-
168-
val right = StructType(
169-
StructField("c", LongType) :: Nil)
170-
171-
val expected = StructType(
172-
StructField("a", LongType) ::
173-
StructField("b", FloatType) ::
174-
StructField("c", LongType) :: Nil)
175-
176-
val merged = left.merge(right)
177-
178-
assert(DataType.equalsIgnoreCompatibleNullability(merged, expected))
179-
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
180-
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
181-
assert(merged("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
182-
}
183-
184135
test("merge where right contains type conflict") {
185136
val left = StructType(
186137
StructField("a", LongType) ::

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,7 @@ class ParquetFileFormat
109109

110110
// We want to clear this temporary metadata from saving into Parquet file.
111111
// This metadata is only useful for detecting optional columns when pushdowning filters.
112-
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
113-
dataSchema).asInstanceOf[StructType]
114-
ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)
112+
ParquetWriteSupport.setSchema(dataSchema, conf)
115113

116114
// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
117115
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
@@ -307,11 +305,7 @@ class ParquetFileFormat
307305
ParquetWriteSupport.SPARK_ROW_SCHEMA,
308306
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
309307

310-
// We want to clear this temporary metadata from saving into Parquet file.
311-
// This metadata is only useful for detecting optional columns when pushdowning filters.
312-
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
313-
requiredSchema).asInstanceOf[StructType]
314-
ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
308+
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
315309

316310
// Sets flags for `CatalystSchemaConverter`
317311
hadoopConf.setBoolean(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -169,23 +169,14 @@ private[parquet] object ParquetFilters {
169169
}
170170

171171
/**
172-
* Returns a map from name of the column to the data type, if predicate push down applies
173-
* (i.e. not an optional field).
174-
*
175-
* SPARK-11955: The optional fields will have metadata StructType.metadataKeyForOptionalField.
176-
* These fields only exist in one side of merged schemas. Due to that, we can't push down filters
177-
* using such fields, otherwise Parquet library will throw exception (PARQUET-389).
178-
* Here we filter out such fields.
172+
* Returns a map from name of the column to the data type, if predicate push down applies.
179173
*/
180174
private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match {
181175
case StructType(fields) =>
182176
// Here we don't flatten the fields in the nested schema but just look up through
183177
// root fields. Currently, accessing to nested fields does not push down filters
184178
// and it does not support to create filters for them.
185-
fields.filter { f =>
186-
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
187-
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
188-
}.map(f => f.name -> f.dataType).toMap
179+
fields.map(f => f.name -> f.dataType).toMap
189180
case _ => Map.empty[String, DataType]
190181
}
191182

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 14 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -368,76 +368,36 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
368368
}
369369

370370

371-
test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
371+
test("Filter applied on merged Parquet schema with new column should work") {
372372
import testImplicits._
373373
Seq("true", "false").map { vectorized =>
374374
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
375375
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
376376
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
377377
withTempPath { dir =>
378-
val pathOne = s"${dir.getCanonicalPath}/table1"
379-
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
380-
val pathTwo = s"${dir.getCanonicalPath}/table2"
381-
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
382-
383-
// If the "c = 1" filter gets pushed down, this query will throw an exception which
384-
// Parquet emits. This is a Parquet issue (PARQUET-389).
385-
val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
378+
val path1 = s"${dir.getCanonicalPath}/table1"
379+
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path1)
380+
val path2 = s"${dir.getCanonicalPath}/table2"
381+
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(path2)
382+
383+
// No matter "c = 1" gets pushed down or not, this query should work without exception.
384+
val df = spark.read.parquet(path1, path2).filter("c = 1").selectExpr("c", "b", "a")
386385
checkAnswer(
387386
df,
388387
Row(1, "1", null))
389388

390-
// The fields "a" and "c" only exist in one Parquet file.
391-
assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
392-
assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
393-
394-
val pathThree = s"${dir.getCanonicalPath}/table3"
395-
df.write.parquet(pathThree)
396-
397-
// We will remove the temporary metadata when writing Parquet file.
398-
val schema = spark.read.parquet(pathThree).schema
399-
assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
400-
401-
val pathFour = s"${dir.getCanonicalPath}/table4"
389+
val path3 = s"${dir.getCanonicalPath}/table3"
402390
val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
403-
dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
391+
dfStruct.select(struct("a").as("s")).write.parquet(path3)
404392

405-
val pathFive = s"${dir.getCanonicalPath}/table5"
393+
val path4 = s"${dir.getCanonicalPath}/table4"
406394
val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
407-
dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
395+
dfStruct2.select(struct("c").as("s")).write.parquet(path4)
408396

409-
// If the "s.c = 1" filter gets pushed down, this query will throw an exception which
410-
// Parquet emits.
411-
val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
397+
// No matter "s.c = 1" gets pushed down or not, this query should work without exception.
398+
val dfStruct3 = spark.read.parquet(path3, path4).filter("s.c = 1")
412399
.selectExpr("s")
413400
checkAnswer(dfStruct3, Row(Row(null, 1)))
414-
415-
// The fields "s.a" and "s.c" only exist in one Parquet file.
416-
val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
417-
assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
418-
assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
419-
420-
val pathSix = s"${dir.getCanonicalPath}/table6"
421-
dfStruct3.write.parquet(pathSix)
422-
423-
// We will remove the temporary metadata when writing Parquet file.
424-
val forPathSix = spark.read.parquet(pathSix).schema
425-
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
426-
427-
// sanity test: make sure optional metadata field is not wrongly set.
428-
val pathSeven = s"${dir.getCanonicalPath}/table7"
429-
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
430-
val pathEight = s"${dir.getCanonicalPath}/table8"
431-
(4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)
432-
433-
val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
434-
checkAnswer(
435-
df2,
436-
Row(1, "1"))
437-
438-
// The fields "a" and "b" exist in both two Parquet files. No metadata is set.
439-
assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
440-
assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
441401
}
442402
}
443403
}

0 commit comments

Comments
 (0)