Skip to content

Commit c2b5b3c

Browse files
Marcelo Vanzinliancheng
authored andcommitted
[SPARK-16632][SQL] Respect Hive schema when merging parquet schema.
When Hive (or at least certain versions of Hive) creates parquet files containing tinyint or smallint columns, it stores them as int32, but doesn't annotate the parquet field as containing the corresponding int8 / int16 data. When Spark reads those files using the vectorized reader, it follows the parquet schema for these fields, but when actually reading the data it tries to use the type fetched from the metastore, and then fails because data has been loaded into the wrong fields in OnHeapColumnVector. So instead of blindly trusting the parquet schema, check whether the Catalyst-provided schema disagrees with it, and adjust the types so that the necessary metadata is present when loading the data into the ColumnVector instance. Tested with unit tests and with tests that create byte / short columns in Hive and try to read them from Spark. Author: Marcelo Vanzin <[email protected]> Closes #14272 from vanzin/SPARK-16632. (cherry picked from commit 75146be) Signed-off-by: Cheng Lian <[email protected]>
1 parent 6f209c8 commit c2b5b3c

File tree

2 files changed

+57
-0
lines changed

2 files changed

+57
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
2626
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
2727
import org.apache.parquet.io.api.RecordMaterializer
2828
import org.apache.parquet.schema._
29+
import org.apache.parquet.schema.OriginalType._
30+
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
2931
import org.apache.parquet.schema.Type.Repetition
3032

3133
import org.apache.spark.internal.Logging
@@ -116,6 +118,12 @@ private[parquet] object ParquetReadSupport {
116118
}
117119

118120
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
121+
val primName = if (parquetType.isPrimitive()) {
122+
parquetType.asPrimitiveType().getPrimitiveTypeName()
123+
} else {
124+
null
125+
}
126+
119127
catalystType match {
120128
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
121129
// Only clips array types with nested type as element type.
@@ -130,6 +138,16 @@ private[parquet] object ParquetReadSupport {
130138
case t: StructType =>
131139
clipParquetGroup(parquetType.asGroupType(), t)
132140

141+
case _: ByteType if primName == INT32 =>
142+
// SPARK-16632: Handle case where Hive stores bytes in a int32 field without specifying
143+
// the original type.
144+
Types.primitive(INT32, parquetType.getRepetition()).as(INT_8).named(parquetType.getName())
145+
146+
case _: ShortType if primName == INT32 =>
147+
// SPARK-16632: Handle case where Hive stores shorts in a int32 field without specifying
148+
// the original type.
149+
Types.primitive(INT32, parquetType.getRepetition()).as(INT_16).named(parquetType.getName())
150+
133151
case _ =>
134152
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
135153
// to be mapped to desired user-space types. So UDTs shouldn't participate schema merging.

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1573,4 +1573,43 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
15731573
| }
15741574
|}
15751575
""".stripMargin)
1576+
1577+
testSchemaClipping(
1578+
"int32 parquet field with byte schema field",
1579+
1580+
parquetSchema =
1581+
"""message root {
1582+
| optional int32 value;
1583+
|}
1584+
""".stripMargin,
1585+
1586+
catalystSchema =
1587+
new StructType()
1588+
.add("value", ByteType, nullable = true),
1589+
1590+
expectedSchema =
1591+
"""message root {
1592+
| optional int32 value (INT_8);
1593+
|}
1594+
""".stripMargin)
1595+
1596+
testSchemaClipping(
1597+
"int32 parquet field with short schema field",
1598+
1599+
parquetSchema =
1600+
"""message root {
1601+
| optional int32 value;
1602+
|}
1603+
""".stripMargin,
1604+
1605+
catalystSchema =
1606+
new StructType()
1607+
.add("value", ShortType, nullable = true),
1608+
1609+
expectedSchema =
1610+
"""message root {
1611+
| optional int32 value (INT_16);
1612+
|}
1613+
""".stripMargin)
1614+
15761615
}

0 commit comments

Comments
 (0)