Skip to content

Commit b6f4526

Browse files
committed
Fixes test failures
1 parent fb4d67d commit b6f4526

File tree

3 files changed

+168
-213
lines changed

3 files changed

+168
-213
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.parquet.schema._
3030

3131
import org.apache.spark.Logging
3232
import org.apache.spark.sql.catalyst.InternalRow
33-
import org.apache.spark.sql.types.{MapType, ArrayType, DataType, StructType}
33+
import org.apache.spark.sql.types._
3434

3535
private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
3636
// Called after `init()` when initializing Parquet record reader.
@@ -195,33 +195,21 @@ private[parquet] object CatalystReadSupport {
195195
}
196196

197197
private def clipParquetRecord(parquetRecord: GroupType, structType: StructType): GroupType = {
198-
val resultFields = {
199-
val parquetFields = parquetRecord.getFields.asScala
200-
201-
val clippedFields = {
202-
val catalystFieldNames = structType.fieldNames.toSet
203-
parquetFields.collect {
204-
case f if catalystFieldNames.contains(f.getName) =>
205-
clipParquetType(f, structType.apply(f.getName).dataType)
206-
}
198+
val tailoredFields = {
199+
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
200+
val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true)
201+
structType.map { f =>
202+
parquetFieldMap
203+
.get(f.name)
204+
.map(clipParquetType(_, f.dataType))
205+
.getOrElse(toParquet.convertField(f))
207206
}
208-
209-
val paddedFields = {
210-
val parquetFieldNames = parquetFields.map(_.getName).toSet
211-
val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true)
212-
structType.collect {
213-
case f if !parquetFieldNames.contains(f.name) =>
214-
toParquet.convertField(f)
215-
}
216-
}
217-
218-
(clippedFields ++ paddedFields).sortBy(_.getName)
219207
}
220208

221-
Types
222-
.buildGroup(parquetRecord.getRepetition)
223-
.as(parquetRecord.getOriginalType)
224-
.addFields(resultFields: _*)
225-
.named(parquetRecord.getName)
209+
new GroupType(
210+
parquetRecord.getRepetition,
211+
parquetRecord.getName,
212+
parquetRecord.getOriginalType,
213+
tailoredFields.asJava)
226214
}
227215
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -113,31 +113,6 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
113113
* When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
114114
* any "parent" container.
115115
*
116-
* @note Constructor argument [[parquetType]] refers to requested fields of the actual schema of the
117-
* Parquet file being read, while constructor argument [[catalystType]] refers to requested
118-
* fields of the global schema. The key difference is that, in case of schema merging,
119-
* [[parquetType]] can be a subset of [[catalystType]]. For example, it's possible to have
120-
* the following [[catalystType]]:
121-
* {{{
122-
* new StructType()
123-
* .add("f1", IntegerType, nullable = false)
124-
* .add("f2", StringType, nullable = true)
125-
* .add("f3", new StructType()
126-
* .add("f31", DoubleType, nullable = false)
127-
* .add("f32", IntegerType, nullable = true)
128-
* .add("f33", StringType, nullable = true), nullable = false)
129-
* }}}
130-
* and the following [[parquetType]] (`f2` and `f32` are missing):
131-
* {{{
132-
* message root {
133-
* required int32 f1;
134-
* required group f3 {
135-
* required double f31;
136-
* optional binary f33 (utf8);
137-
* }
138-
* }
139-
* }}}
140-
*
141116
* @param parquetType Parquet schema of Parquet records
142117
* @param catalystType Spark SQL schema that corresponds to the Parquet record type
143118
* @param updater An updater which propagates converted field values to the parent container
@@ -179,31 +154,7 @@ private[parquet] class CatalystRowConverter(
179154

180155
// Converters for each field.
181156
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
182-
// In case of schema merging, `parquetType` can be a subset of `catalystType`. We need to pad
183-
// those missing fields and create converters for them, although values of these fields are
184-
// always null.
185-
val paddedParquetFields = {
186-
val parquetFields = parquetType.getFields.asScala
187-
val parquetFieldNames = parquetFields.map(_.getName).toSet
188-
val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name))
189-
190-
// We don't need to worry about feature flag arguments like `assumeBinaryIsString` when
191-
// creating the schema converter here, since values of missing fields are always null.
192-
val toParquet = new CatalystSchemaConverter()
193-
194-
(parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f =>
195-
catalystType.indexWhere(_.name == f.getName)
196-
}
197-
}
198-
199-
if (paddedParquetFields.length != catalystType.length) {
200-
throw new UnsupportedOperationException(
201-
"A Parquet file's schema has different number of fields with the table schema. " +
202-
"Please enable schema merging by setting \"mergeSchema\" to true when load " +
203-
"a Parquet dataset or set spark.sql.parquet.mergeSchema to true in SQLConf.")
204-
}
205-
206-
paddedParquetFields.zip(catalystType).zipWithIndex.map {
157+
parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
207158
case ((parquetFieldType, catalystField), ordinal) =>
208159
// Converted field value should be set to the `ordinal`-th cell of `currentRow`
209160
newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))

0 commit comments

Comments
 (0)