Skip to content

Commit a1303de

Browse files
lianchengrxin
authored andcommitted
[SPARK-13070][SQL] Better error message when Parquet schema merging fails
Make sure we throw better error messages when Parquet schema merging fails. Author: Cheng Lian <[email protected]> Author: Liang-Chi Hsieh <[email protected]> Closes #10979 from viirya/schema-merging-failure-message.
1 parent de28371 commit a1303de

File tree

4 files changed

+77
-7
lines changed

4 files changed

+77
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -424,13 +424,13 @@ object StructType extends AbstractDataType {
424424
if ((leftPrecision == rightPrecision) && (leftScale == rightScale)) {
425425
DecimalType(leftPrecision, leftScale)
426426
} else if ((leftPrecision != rightPrecision) && (leftScale != rightScale)) {
427-
throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
427+
throw new SparkException("Failed to merge decimal types with incompatible " +
428428
s"precision $leftPrecision and $rightPrecision & scale $leftScale and $rightScale")
429429
} else if (leftPrecision != rightPrecision) {
430-
throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
430+
throw new SparkException("Failed to merge decimal types with incompatible " +
431431
s"precision $leftPrecision and $rightPrecision")
432432
} else {
433-
throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
433+
throw new SparkException("Failed to merge decimal types with incompatible " +
434434
s"scala $leftScale and $rightScale")
435435
}
436436

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -800,12 +800,37 @@ private[sql] object ParquetRelation extends Logging {
800800
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
801801
writeLegacyParquetFormat = writeLegacyParquetFormat)
802802

803-
footers.map { footer =>
804-
ParquetRelation.readSchemaFromFooter(footer, converter)
805-
}.reduceLeftOption(_ merge _).iterator
803+
if (footers.isEmpty) {
804+
Iterator.empty
805+
} else {
806+
var mergedSchema = ParquetRelation.readSchemaFromFooter(footers.head, converter)
807+
footers.tail.foreach { footer =>
808+
val schema = ParquetRelation.readSchemaFromFooter(footer, converter)
809+
try {
810+
mergedSchema = mergedSchema.merge(schema)
811+
} catch { case cause: SparkException =>
812+
throw new SparkException(
813+
s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}", cause)
814+
}
815+
}
816+
Iterator.single(mergedSchema)
817+
}
806818
}.collect()
807819

808-
partiallyMergedSchemas.reduceLeftOption(_ merge _)
820+
if (partiallyMergedSchemas.isEmpty) {
821+
None
822+
} else {
823+
var finalSchema = partiallyMergedSchemas.head
824+
partiallyMergedSchemas.tail.foreach { schema =>
825+
try {
826+
finalSchema = finalSchema.merge(schema)
827+
} catch { case cause: SparkException =>
828+
throw new SparkException(
829+
s"Failed merging schema:\n${schema.treeString}", cause)
830+
}
831+
}
832+
Some(finalSchema)
833+
}
809834
}
810835

811836
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
421421
// We will remove the temporary metadata when writing Parquet file.
422422
val forPathSix = sqlContext.read.parquet(pathSix).schema
423423
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
424+
425+
// sanity test: make sure optional metadata field is not wrongly set.
426+
val pathSeven = s"${dir.getCanonicalPath}/table7"
427+
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
428+
val pathEight = s"${dir.getCanonicalPath}/table8"
429+
(4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)
430+
431+
val df2 = sqlContext.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
432+
checkAnswer(
433+
df2,
434+
Row(1, "1"))
435+
436+
// The fields "a" and "b" exist in both two Parquet files. No metadata is set.
437+
assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
438+
assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
424439
}
425440
}
426441
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.reflect.runtime.universe.TypeTag
2222

2323
import org.apache.parquet.schema.MessageTypeParser
2424

25+
import org.apache.spark.SparkException
2526
import org.apache.spark.sql.catalyst.ScalaReflection
2627
import org.apache.spark.sql.test.SharedSQLContext
2728
import org.apache.spark.sql.types._
@@ -449,6 +450,35 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
449450
}.getMessage.contains("detected conflicting schemas"))
450451
}
451452

453+
test("schema merging failure error message") {
454+
withTempPath { dir =>
455+
val path = dir.getCanonicalPath
456+
sqlContext.range(3).write.parquet(s"$path/p=1")
457+
sqlContext.range(3).selectExpr("CAST(id AS INT) AS id").write.parquet(s"$path/p=2")
458+
459+
val message = intercept[SparkException] {
460+
sqlContext.read.option("mergeSchema", "true").parquet(path).schema
461+
}.getMessage
462+
463+
assert(message.contains("Failed merging schema of file"))
464+
}
465+
466+
// test for second merging (after read Parquet schema in parallel done)
467+
withTempPath { dir =>
468+
val path = dir.getCanonicalPath
469+
sqlContext.range(3).write.parquet(s"$path/p=1")
470+
sqlContext.range(3).selectExpr("CAST(id AS INT) AS id").write.parquet(s"$path/p=2")
471+
472+
sqlContext.sparkContext.conf.set("spark.default.parallelism", "20")
473+
474+
val message = intercept[SparkException] {
475+
sqlContext.read.option("mergeSchema", "true").parquet(path).schema
476+
}.getMessage
477+
478+
assert(message.contains("Failed merging schema:"))
479+
}
480+
}
481+
452482
// =======================================================
453483
// Tests for converting Parquet LIST to Catalyst ArrayType
454484
// =======================================================

0 commit comments

Comments
 (0)