Skip to content

Commit 02149ff

Browse files
committed
[SPARK-8848] [SQL] Refactors Parquet write path to follow parquet-format
This PR refactors Parquet write path to follow parquet-format spec. It's a successor of PR #7679, but with less non-essential changes. Major changes include: 1. Replaces `RowWriteSupport` and `MutableRowWriteSupport` with `CatalystWriteSupport` - Writes Parquet data using standard layout defined in parquet-format Specifically, we are now writing ... - ... arrays and maps in standard 3-level structure with proper annotations and field names - ... decimals as `INT32` and `INT64` whenever possible, and taking `FIXED_LEN_BYTE_ARRAY` as the final fallback - Supports legacy mode which is compatible with Spark 1.4 and prior versions The legacy mode is by default off, and can be turned on by flipping SQL option `spark.sql.parquet.writeLegacyFormat` to `true`. - Eliminates per value data type dispatching costs via prebuilt composed writer functions 1. Cleans up the last pieces of old Parquet support code As pointed out by rxin previously, we probably want to rename all those `Catalyst*` Parquet classes to `Parquet*` for clarity. But I'd like to do this in a follow-up PR to minimize code review noises in this one. Author: Cheng Lian <[email protected]> Closes #8988 from liancheng/spark-8848/standard-parquet-write-path.
1 parent 2816c89 commit 02149ff

File tree

18 files changed

+709
-693
lines changed

18 files changed

+709
-693
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ final class Decimal extends Ordered[Decimal] with Serializable {
108108
*/
109109
def set(decimal: BigDecimal, precision: Int, scale: Int): Decimal = {
110110
this.decimalVal = decimal.setScale(scale, ROUNDING_MODE)
111-
require(decimalVal.precision <= precision, "Overflowed precision")
111+
require(
112+
decimalVal.precision <= precision,
113+
s"Decimal precision ${decimalVal.precision} exceeds max precision $precision")
112114
this.longVal = 0L
113115
this._precision = precision
114116
this._scale = scale

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,9 @@ private[spark] object SQLConf {
292292

293293
val PARQUET_WRITE_LEGACY_FORMAT = booleanConf(
294294
key = "spark.sql.parquet.writeLegacyFormat",
295-
defaultValue = Some(true),
295+
defaultValue = Some(false),
296296
doc = "Whether to follow Parquet's format specification when converting Parquet schema to " +
297-
"Spark SQL schema and vice versa.",
298-
isPublic = false)
297+
"Spark SQL schema and vice versa.")
299298

300299
val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
301300
key = "spark.sql.parquet.output.committer.class",

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
9595
""".stripMargin
9696
}
9797

98-
new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
98+
new CatalystRecordMaterializer(
99+
parquetRequestedSchema,
100+
CatalystReadSupport.expandUDT(catalystRequestedSchema))
99101
}
100102
}
101103

@@ -110,7 +112,10 @@ private[parquet] object CatalystReadSupport {
110112
*/
111113
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
112114
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
113-
Types.buildMessage().addFields(clippedParquetFields: _*).named("root")
115+
Types
116+
.buildMessage()
117+
.addFields(clippedParquetFields: _*)
118+
.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
114119
}
115120

116121
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
@@ -271,4 +276,30 @@ private[parquet] object CatalystReadSupport {
271276
.getOrElse(toParquet.convertField(f))
272277
}
273278
}
279+
280+
def expandUDT(schema: StructType): StructType = {
281+
def expand(dataType: DataType): DataType = {
282+
dataType match {
283+
case t: ArrayType =>
284+
t.copy(elementType = expand(t.elementType))
285+
286+
case t: MapType =>
287+
t.copy(
288+
keyType = expand(t.keyType),
289+
valueType = expand(t.valueType))
290+
291+
case t: StructType =>
292+
val expandedFields = t.fields.map(f => f.copy(dataType = expand(f.dataType)))
293+
t.copy(fields = expandedFields)
294+
295+
case t: UserDefinedType[_] =>
296+
t.sqlType
297+
298+
case t =>
299+
t
300+
}
301+
}
302+
303+
expand(schema).asInstanceOf[StructType]
304+
}
274305
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import org.apache.parquet.column.Dictionary
2727
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
2828
import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
2929
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
30-
import org.apache.parquet.schema.Type.Repetition
3130
import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
3231

3332
import org.apache.spark.Logging
@@ -114,7 +113,8 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
114113
* any "parent" container.
115114
*
116115
* @param parquetType Parquet schema of Parquet records
117-
* @param catalystType Spark SQL schema that corresponds to the Parquet record type
116+
* @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
117+
* types should have been expanded.
118118
* @param updater An updater which propagates converted field values to the parent container
119119
*/
120120
private[parquet] class CatalystRowConverter(
@@ -133,6 +133,12 @@ private[parquet] class CatalystRowConverter(
133133
|${catalystType.prettyJson}
134134
""".stripMargin)
135135

136+
assert(
137+
!catalystType.existsRecursively(_.isInstanceOf[UserDefinedType[_]]),
138+
s"""User-defined types in Catalyst schema should have already been expanded:
139+
|${catalystType.prettyJson}
140+
""".stripMargin)
141+
136142
logDebug(
137143
s"""Building row converter for the following schema:
138144
|
@@ -268,13 +274,6 @@ private[parquet] class CatalystRowConverter(
268274
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
269275
})
270276

271-
case t: UserDefinedType[_] =>
272-
val catalystTypeForUDT = t.sqlType
273-
val nullable = parquetType.isRepetition(Repetition.OPTIONAL)
274-
val field = StructField("udt", catalystTypeForUDT, nullable)
275-
val parquetTypeForUDT = new CatalystSchemaConverter().convertField(field)
276-
newConverter(parquetTypeForUDT, catalystTypeForUDT, updater)
277-
278277
case _ =>
279278
throw new RuntimeException(
280279
s"Unable to create Parquet converter for data type ${catalystType.json}")
@@ -340,30 +339,36 @@ private[parquet] class CatalystRowConverter(
340339
val scale = decimalType.scale
341340

342341
if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) {
343-
// Constructs a `Decimal` with an unscaled `Long` value if possible. The underlying
344-
// `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we are using
345-
// `Binary.toByteBuffer.array()` to steal the underlying byte array without copying it.
346-
val buffer = value.toByteBuffer
347-
val bytes = buffer.array()
348-
val start = buffer.position()
349-
val end = buffer.limit()
350-
351-
var unscaled = 0L
352-
var i = start
353-
354-
while (i < end) {
355-
unscaled = (unscaled << 8) | (bytes(i) & 0xff)
356-
i += 1
357-
}
358-
359-
val bits = 8 * (end - start)
360-
unscaled = (unscaled << (64 - bits)) >> (64 - bits)
342+
// Constructs a `Decimal` with an unscaled `Long` value if possible.
343+
val unscaled = binaryToUnscaledLong(value)
361344
Decimal(unscaled, precision, scale)
362345
} else {
363346
// Otherwise, resorts to an unscaled `BigInteger` instead.
364347
Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), precision, scale)
365348
}
366349
}
350+
351+
private def binaryToUnscaledLong(binary: Binary): Long = {
352+
// The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here
353+
// we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without
354+
// copying it.
355+
val buffer = binary.toByteBuffer
356+
val bytes = buffer.array()
357+
val start = buffer.position()
358+
val end = buffer.limit()
359+
360+
var unscaled = 0L
361+
var i = start
362+
363+
while (i < end) {
364+
unscaled = (unscaled << 8) | (bytes(i) & 0xff)
365+
i += 1
366+
}
367+
368+
val bits = 8 * (end - start)
369+
unscaled = (unscaled << (64 - bits)) >> (64 - bits)
370+
unscaled
371+
}
367372
}
368373

369374
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private[parquet] class CatalystSchemaConverter(
121121
val precision = field.getDecimalMetadata.getPrecision
122122
val scale = field.getDecimalMetadata.getScale
123123

124-
CatalystSchemaConverter.analysisRequire(
124+
CatalystSchemaConverter.checkConversionRequirement(
125125
maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
126126
s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)")
127127

@@ -155,7 +155,7 @@ private[parquet] class CatalystSchemaConverter(
155155
}
156156

157157
case INT96 =>
158-
CatalystSchemaConverter.analysisRequire(
158+
CatalystSchemaConverter.checkConversionRequirement(
159159
assumeInt96IsTimestamp,
160160
"INT96 is not supported unless it's interpreted as timestamp. " +
161161
s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.")
@@ -197,11 +197,11 @@ private[parquet] class CatalystSchemaConverter(
197197
//
198198
// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
199199
case LIST =>
200-
CatalystSchemaConverter.analysisRequire(
200+
CatalystSchemaConverter.checkConversionRequirement(
201201
field.getFieldCount == 1, s"Invalid list type $field")
202202

203203
val repeatedType = field.getType(0)
204-
CatalystSchemaConverter.analysisRequire(
204+
CatalystSchemaConverter.checkConversionRequirement(
205205
repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
206206

207207
if (isElementType(repeatedType, field.getName)) {
@@ -217,17 +217,17 @@ private[parquet] class CatalystSchemaConverter(
217217
// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
218218
// scalastyle:on
219219
case MAP | MAP_KEY_VALUE =>
220-
CatalystSchemaConverter.analysisRequire(
220+
CatalystSchemaConverter.checkConversionRequirement(
221221
field.getFieldCount == 1 && !field.getType(0).isPrimitive,
222222
s"Invalid map type: $field")
223223

224224
val keyValueType = field.getType(0).asGroupType()
225-
CatalystSchemaConverter.analysisRequire(
225+
CatalystSchemaConverter.checkConversionRequirement(
226226
keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2,
227227
s"Invalid map type: $field")
228228

229229
val keyType = keyValueType.getType(0)
230-
CatalystSchemaConverter.analysisRequire(
230+
CatalystSchemaConverter.checkConversionRequirement(
231231
keyType.isPrimitive,
232232
s"Map key type is expected to be a primitive type, but found: $keyType")
233233

@@ -299,7 +299,10 @@ private[parquet] class CatalystSchemaConverter(
299299
* Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
300300
*/
301301
def convert(catalystSchema: StructType): MessageType = {
302-
Types.buildMessage().addFields(catalystSchema.map(convertField): _*).named("root")
302+
Types
303+
.buildMessage()
304+
.addFields(catalystSchema.map(convertField): _*)
305+
.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
303306
}
304307

305308
/**
@@ -347,10 +350,10 @@ private[parquet] class CatalystSchemaConverter(
347350
// NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec.
348351
//
349352
// As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond
350-
// timestamp in Impala for some historical reasons, it's not recommended to be used for any
351-
// other types and will probably be deprecated in future Parquet format spec. That's the
352-
// reason why Parquet format spec only defines `TIMESTAMP_MILLIS` and `TIMESTAMP_MICROS` which
353-
// are both logical types annotating `INT64`.
353+
// timestamp in Impala for some historical reasons. It's not recommended to be used for any
354+
// other types and will probably be deprecated in some future version of parquet-format spec.
355+
// That's the reason why parquet-format spec only defines `TIMESTAMP_MILLIS` and
356+
// `TIMESTAMP_MICROS` which are both logical types annotating `INT64`.
354357
//
355358
// Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting
356359
// from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store
@@ -361,7 +364,7 @@ private[parquet] class CatalystSchemaConverter(
361364
// currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using)
362365
// hasn't implemented `TIMESTAMP_MICROS` yet.
363366
//
364-
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
367+
// TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
365368
case TimestampType =>
366369
Types.primitive(INT96, repetition).named(field.name)
367370

@@ -523,11 +526,12 @@ private[parquet] class CatalystSchemaConverter(
523526
}
524527
}
525528

526-
527529
private[parquet] object CatalystSchemaConverter {
530+
val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
531+
528532
def checkFieldName(name: String): Unit = {
529533
// ,;{}()\n\t= and space are special characters in Parquet schema
530-
analysisRequire(
534+
checkConversionRequirement(
531535
!name.matches(".*[ ,;{}()\n\t=].*"),
532536
s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
533537
|Please use alias to rename it.
@@ -539,7 +543,7 @@ private[parquet] object CatalystSchemaConverter {
539543
schema
540544
}
541545

542-
def analysisRequire(f: => Boolean, message: String): Unit = {
546+
def checkConversionRequirement(f: => Boolean, message: String): Unit = {
543547
if (!f) {
544548
throw new AnalysisException(message)
545549
}
@@ -553,16 +557,8 @@ private[parquet] object CatalystSchemaConverter {
553557
numBytes
554558
}
555559

556-
private val MIN_BYTES_FOR_PRECISION = Array.tabulate[Int](39)(computeMinBytesForPrecision)
557-
558560
// Returns the minimum number of bytes needed to store a decimal with a given `precision`.
559-
def minBytesForPrecision(precision : Int) : Int = {
560-
if (precision < MIN_BYTES_FOR_PRECISION.length) {
561-
MIN_BYTES_FOR_PRECISION(precision)
562-
} else {
563-
computeMinBytesForPrecision(precision)
564-
}
565-
}
561+
val minBytesForPrecision = Array.tabulate[Int](39)(computeMinBytesForPrecision)
566562

567563
val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) /* 9 */
568564

0 commit comments

Comments
 (0)