Skip to content

Commit 8ab5076

Browse files
committed
[SPARK-6777] [SQL] Implements backwards compatibility rules in CatalystSchemaConverter
This PR introduces `CatalystSchemaConverter` for converting Parquet schema to Spark SQL schema and vice versa. Original conversion code in `ParquetTypesConverter` is removed. Benefits of the new version are: 1. When converting Spark SQL schemas, it generates standard Parquet schemas conforming to [the most updated Parquet format spec] [1]. Converting to old style Parquet schemas is also supported via feature flag `spark.sql.parquet.followParquetFormatSpec` (which is set to `false` for now, and should be set to `true` after both read and write paths are fixed). Note that although this version of Parquet format spec hasn't been officially release yet, Parquet MR 1.7.0 already sticks to it. So it should be safe to follow. 1. It implements backwards-compatibility rules described in the most updated Parquet format spec. Thus can recognize more schema patterns generated by other/legacy systems/tools. 1. Code organization follows convention used in [parquet-mr] [2], which is easier to follow. (Structure of `CatalystSchemaConverter` is similar to `AvroSchemaConverter`). To fully implement backwards-compatibility rules in both read and write path, we also need to update `CatalystRowConverter` (which is responsible for converting Parquet records to `Row`s), `RowReadSupport`, and `RowWriteSupport`. These would be done in follow-up PRs. TODO - [x] More schema conversion test cases for legacy schema patterns. [1]: https://github.com/apache/parquet-format/blob/ea095226597fdbecd60c2419d96b54b2fdb4ae6c/LogicalTypes.md [2]: https://github.com/apache/parquet-mr/ Author: Cheng Lian <[email protected]> Closes apache#6617 from liancheng/spark-6777 and squashes the following commits: 2a2062d [Cheng Lian] Don't convert decimals without precision information b60979b [Cheng Lian] Adds a constructor which accepts a Configuration, and fixes default value of assumeBinaryIsString 743730f [Cheng Lian] Decimal scale shouldn't be larger than precision a104a9e [Cheng Lian] Fixes Scala style issue 1f71d8d [Cheng Lian] Adds feature flag to allow falling back to old style Parquet schema conversion ba84f4b [Cheng Lian] Fixes MapType schema conversion bug 13cb8d5 [Cheng Lian] Fixes MiMa failure 81de5b0 [Cheng Lian] Fixes UDT, workaround read path, and add tests 28ef95b [Cheng Lian] More AnalysisExceptions b10c322 [Cheng Lian] Replaces require() with analysisRequire() which throws AnalysisException cceaf3f [Cheng Lian] Implements backwards compatibility rules in CatalystSchemaConverter
1 parent fb32c38 commit 8ab5076

File tree

9 files changed

+1297
-422
lines changed

9 files changed

+1297
-422
lines changed

project/MimaExcludes.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,12 @@ object MimaExcludes {
6969
ProblemFilters.exclude[MissingClassProblem](
7070
"org.apache.spark.sql.parquet.CatalystTimestampConverter"),
7171
ProblemFilters.exclude[MissingClassProblem](
72-
"org.apache.spark.sql.parquet.CatalystTimestampConverter$")
72+
"org.apache.spark.sql.parquet.CatalystTimestampConverter$"),
73+
// SPARK-6777 Implements backwards compatibility rules in CatalystSchemaConverter
74+
ProblemFilters.exclude[MissingClassProblem](
75+
"org.apache.spark.sql.parquet.ParquetTypeInfo"),
76+
ProblemFilters.exclude[MissingClassProblem](
77+
"org.apache.spark.sql.parquet.ParquetTypeInfo$")
7378
)
7479
case v if v.startsWith("1.4") =>
7580
Seq(

sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,18 @@ package org.apache.spark.sql.types
2020
import scala.reflect.runtime.universe.typeTag
2121

2222
import org.apache.spark.annotation.DeveloperApi
23+
import org.apache.spark.sql.AnalysisException
2324
import org.apache.spark.sql.catalyst.ScalaReflectionLock
2425
import org.apache.spark.sql.catalyst.expressions.Expression
2526

2627

2728
/** Precision parameters for a Decimal */
28-
case class PrecisionInfo(precision: Int, scale: Int)
29-
29+
case class PrecisionInfo(precision: Int, scale: Int) {
30+
if (scale > precision) {
31+
throw new AnalysisException(
32+
s"Decimal scale ($scale) cannot be greater than precision ($precision).")
33+
}
34+
}
3035

3136
/**
3237
* :: DeveloperApi ::

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,14 @@ private[spark] object SQLConf {
264264
defaultValue = Some(true),
265265
doc = "<TODO>")
266266

267+
val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf(
268+
key = "spark.sql.parquet.followParquetFormatSpec",
269+
defaultValue = Some(false),
270+
doc = "Wether to stick to Parquet format specification when converting Parquet schema to " +
271+
"Spark SQL schema and vice versa. Sticks to the specification if set to true; falls back " +
272+
"to compatible mode if set to false.",
273+
isPublic = false)
274+
267275
val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
268276
key = "spark.sql.parquet.output.committer.class",
269277
defaultValue = Some(classOf[ParquetOutputCommitter].getName),
@@ -498,6 +506,12 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
498506
*/
499507
private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
500508

509+
/**
510+
* When set to true, sticks to Parquet format spec when converting Parquet schema to Spark SQL
511+
* schema and vice versa. Otherwise, falls back to compatible mode.
512+
*/
513+
private[spark] def followParquetFormatSpec: Boolean = getConf(PARQUET_FOLLOW_PARQUET_FORMAT_SPEC)
514+
501515
/**
502516
* When set to true, partition pruning for in-memory columnar tables is enabled.
503517
*/

0 commit comments

Comments
 (0)