Skip to content

Commit 823c473

Browse files
lianchengCodingCat
authored andcommitted
[SPARK-10005] [SQL] Fixes schema merging for nested structs
In case of schema merging, we only handled first level fields when converting Parquet groups to `InternalRow`s. Nested struct fields are not properly handled. For example, the schema of a Parquet file to be read can be: ``` message individual { required group f1 { optional binary f11 (utf8); } } ``` while the global schema is: ``` message global { required group f1 { optional binary f11 (utf8); optional int32 f12; } } ``` This PR fixes this issue by padding missing fields when creating actual converters. Author: Cheng Lian <[email protected]> Closes apache#8228 from liancheng/spark-10005/nested-schema-merging.
1 parent fa01cd9 commit 823c473

File tree

4 files changed

+112
-22
lines changed

4 files changed

+112
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
3232
import org.apache.spark.sql.types.StructType
3333

3434
private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
35+
// Called after `init()` when initializing Parquet record reader.
3536
override def prepareForRead(
3637
conf: Configuration,
3738
keyValueMetaData: JMap[String, String],
@@ -51,19 +52,30 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
5152
// available if the target file is written by Spark SQL.
5253
.orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
5354
}.map(StructType.fromString).getOrElse {
54-
logDebug("Catalyst schema not available, falling back to Parquet schema")
55+
logInfo("Catalyst schema not available, falling back to Parquet schema")
5556
toCatalyst.convert(parquetRequestedSchema)
5657
}
5758

58-
logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema")
59+
logInfo {
60+
s"""Going to read the following fields from the Parquet file:
61+
|
62+
|Parquet form:
63+
|$parquetRequestedSchema
64+
|
65+
|Catalyst form:
66+
|$catalystRequestedSchema
67+
""".stripMargin
68+
}
69+
5970
new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
6071
}
6172

73+
// Called before `prepareForRead()` when initializing Parquet record reader.
6274
override def init(context: InitContext): ReadContext = {
6375
val conf = context.getConfiguration
6476

6577
// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
66-
// schema of this file from its the metadata.
78+
// schema of this file from its metadata.
6779
val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
6880

6981
// Optional schema of requested columns, in the form of a string serialized from a Catalyst
@@ -141,7 +153,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
141153
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
142154
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
143155

144-
logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema")
145156
new ReadContext(parquetRequestedSchema, metadata)
146157
}
147158
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ import scala.collection.mutable.ArrayBuffer
2525

2626
import org.apache.parquet.column.Dictionary
2727
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
28-
import org.apache.parquet.schema.OriginalType.LIST
28+
import org.apache.parquet.schema.OriginalType.{LIST, INT_32, UTF8}
29+
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
2930
import org.apache.parquet.schema.Type.Repetition
30-
import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
31+
import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
3132

3233
import org.apache.spark.sql.catalyst.InternalRow
3334
import org.apache.spark.sql.catalyst.expressions._
@@ -88,12 +89,54 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
8889
}
8990

9091
/**
91-
* A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[InternalRow]]s.
92-
* Since any Parquet record is also a struct, this converter can also be used as root converter.
92+
* A [[CatalystRowConverter]] is used to convert Parquet records into Catalyst [[InternalRow]]s.
93+
* Since Catalyst `StructType` is also a Parquet record, this converter can be used as root
94+
* converter. Take the following Parquet type as an example:
95+
* {{{
96+
* message root {
97+
* required int32 f1;
98+
* optional group f2 {
99+
* required double f21;
100+
* optional binary f22 (utf8);
101+
* }
102+
* }
103+
* }}}
104+
* 5 converters will be created:
105+
*
106+
* - a root [[CatalystRowConverter]] for [[MessageType]] `root`, which contains:
107+
* - a [[CatalystPrimitiveConverter]] for required [[INT_32]] field `f1`, and
108+
* - a nested [[CatalystRowConverter]] for optional [[GroupType]] `f2`, which contains:
109+
* - a [[CatalystPrimitiveConverter]] for required [[DOUBLE]] field `f21`, and
110+
* - a [[CatalystStringConverter]] for optional [[UTF8]] string field `f22`
93111
*
94112
* When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
95113
* any "parent" container.
96114
*
115+
* @note Constructor argument [[parquetType]] refers to requested fields of the actual schema of the
116+
* Parquet file being read, while constructor argument [[catalystType]] refers to requested
117+
* fields of the global schema. The key difference is that, in case of schema merging,
118+
* [[parquetType]] can be a subset of [[catalystType]]. For example, it's possible to have
119+
* the following [[catalystType]]:
120+
* {{{
121+
* new StructType()
122+
* .add("f1", IntegerType, nullable = false)
123+
* .add("f2", StringType, nullable = true)
124+
* .add("f3", new StructType()
125+
* .add("f31", DoubleType, nullable = false)
126+
* .add("f32", IntegerType, nullable = true)
127+
* .add("f33", StringType, nullable = true), nullable = false)
128+
* }}}
129+
* and the following [[parquetType]] (`f2` and `f32` are missing):
130+
* {{{
131+
* message root {
132+
* required int32 f1;
133+
* required group f3 {
134+
* required double f31;
135+
* optional binary f33 (utf8);
136+
* }
137+
* }
138+
* }}}
139+
*
97140
* @param parquetType Parquet schema of Parquet records
98141
* @param catalystType Spark SQL schema that corresponds to the Parquet record type
99142
* @param updater An updater which propagates converted field values to the parent container
@@ -126,7 +169,24 @@ private[parquet] class CatalystRowConverter(
126169

127170
// Converters for each field.
128171
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
129-
parquetType.getFields.zip(catalystType).zipWithIndex.map {
172+
// In case of schema merging, `parquetType` can be a subset of `catalystType`. We need to pad
173+
// those missing fields and create converters for them, although values of these fields are
174+
// always null.
175+
val paddedParquetFields = {
176+
val parquetFields = parquetType.getFields
177+
val parquetFieldNames = parquetFields.map(_.getName).toSet
178+
val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name))
179+
180+
// We don't need to worry about feature flag arguments like `assumeBinaryIsString` when
181+
// creating the schema converter here, since values of missing fields are always null.
182+
val toParquet = new CatalystSchemaConverter()
183+
184+
(parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f =>
185+
catalystType.indexWhere(_.name == f.getName)
186+
}
187+
}
188+
189+
paddedParquetFields.zip(catalystType).zipWithIndex.map {
130190
case ((parquetFieldType, catalystField), ordinal) =>
131191
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
132192
newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,18 +72,9 @@ private[parquet] class CatalystSchemaConverter(
7272
followParquetFormatSpec = conf.followParquetFormatSpec)
7373

7474
def this(conf: Configuration) = this(
75-
assumeBinaryIsString =
76-
conf.getBoolean(
77-
SQLConf.PARQUET_BINARY_AS_STRING.key,
78-
SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get),
79-
assumeInt96IsTimestamp =
80-
conf.getBoolean(
81-
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
82-
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get),
83-
followParquetFormatSpec =
84-
conf.getBoolean(
85-
SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key,
86-
SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get))
75+
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
76+
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
77+
followParquetFormatSpec = conf.get(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key).toBoolean)
8778

8879
/**
8980
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.File
2121

2222
import org.apache.hadoop.fs.Path
2323

24-
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
24+
import org.apache.spark.sql._
2525
import org.apache.spark.sql.test.SharedSQLContext
2626
import org.apache.spark.sql.types._
2727
import org.apache.spark.util.Utils
@@ -201,4 +201,32 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
201201
assert(Decimal("67123.45") === Decimal(decimal))
202202
}
203203
}
204+
205+
test("SPARK-10005 Schema merging for nested struct") {
206+
val sqlContext = _sqlContext
207+
import sqlContext.implicits._
208+
209+
withTempPath { dir =>
210+
val path = dir.getCanonicalPath
211+
212+
def append(df: DataFrame): Unit = {
213+
df.write.mode(SaveMode.Append).parquet(path)
214+
}
215+
216+
// Note that both the following two DataFrames contain a single struct column with multiple
217+
// nested fields.
218+
append((1 to 2).map(i => Tuple1((i, i))).toDF())
219+
append((1 to 2).map(i => Tuple1((i, i, i))).toDF())
220+
221+
withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
222+
checkAnswer(
223+
sqlContext.read.option("mergeSchema", "true").parquet(path),
224+
Seq(
225+
Row(Row(1, 1, null)),
226+
Row(Row(2, 2, null)),
227+
Row(Row(1, 1, 1)),
228+
Row(Row(2, 2, 2))))
229+
}
230+
}
231+
}
204232
}

0 commit comments

Comments
 (0)