Skip to content

Commit b6946e6

Browse files
committed
Fixes MiMA issues, addresses comments
1 parent 8232e17 commit b6946e6

File tree

3 files changed

+89
-65
lines changed

3 files changed

+89
-65
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.types
1919

2020
import java.sql.Timestamp
2121

22+
import scala.collection.mutable.ArrayBuffer
2223
import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral}
2324
import scala.reflect.ClassTag
2425
import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag}
@@ -29,6 +30,7 @@ import org.json4s.JsonAST.JValue
2930
import org.json4s.JsonDSL._
3031
import org.json4s.jackson.JsonMethods._
3132

33+
import org.apache.spark.SparkException
3234
import org.apache.spark.annotation.DeveloperApi
3335
import org.apache.spark.sql.catalyst.ScalaReflectionLock
3436
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
@@ -159,7 +161,6 @@ object DataType {
159161
case failure: NoSuccess =>
160162
throw new IllegalArgumentException(s"Unsupported dataType: $asString, $failure")
161163
}
162-
163164
}
164165

165166
protected[types] def buildFormattedString(
@@ -754,6 +755,57 @@ object StructType {
754755
def apply(fields: java.util.List[StructField]): StructType = {
755756
StructType(fields.toArray.asInstanceOf[Array[StructField]])
756757
}
758+
759+
private[sql] def merge(left: DataType, right: DataType): DataType =
760+
(left, right) match {
761+
case (ArrayType(leftElementType, leftContainsNull),
762+
ArrayType(rightElementType, rightContainsNull)) =>
763+
ArrayType(
764+
merge(leftElementType, rightElementType),
765+
leftContainsNull || rightContainsNull)
766+
767+
case (MapType(leftKeyType, leftValueType, leftContainsNull),
768+
MapType(rightKeyType, rightValueType, rightContainsNull)) =>
769+
MapType(
770+
merge(leftKeyType, rightKeyType),
771+
merge(leftValueType, rightValueType),
772+
leftContainsNull || rightContainsNull)
773+
774+
case (StructType(leftFields), StructType(rightFields)) =>
775+
val newFields = ArrayBuffer.empty[StructField]
776+
777+
leftFields.foreach {
778+
case leftField @ StructField(leftName, leftType, leftNullable, _) =>
779+
rightFields
780+
.find(_.name == leftName)
781+
.map { case rightField @ StructField(_, rightType, rightNullable, _) =>
782+
leftField.copy(
783+
dataType = merge(leftType, rightType),
784+
nullable = leftNullable || rightNullable)
785+
}
786+
.orElse(Some(leftField))
787+
.foreach(newFields += _)
788+
}
789+
790+
rightFields
791+
.filterNot(f => leftFields.map(_.name).contains(f.name))
792+
.foreach(newFields += _)
793+
794+
StructType(newFields)
795+
796+
case (DecimalType.Fixed(leftPrecision, leftScale),
797+
DecimalType.Fixed(rightPrecision, rightScale)) =>
798+
DecimalType(leftPrecision.max(rightPrecision), leftScale.max(rightScale))
799+
800+
case (leftUdt: UserDefinedType[_], rightUdt: UserDefinedType[_])
801+
if leftUdt.userClass == rightUdt.userClass => leftUdt
802+
803+
case (leftType, rightType) if leftType == rightType =>
804+
leftType
805+
806+
case _ =>
807+
throw new SparkException(s"Failed to merge incompatible data types $left and $right")
808+
}
757809
}
758810

759811

@@ -890,6 +942,20 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
890942
val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
891943
s"struct<${fieldTypes.mkString(",")}>"
892944
}
945+
946+
/**
947+
* Merges with another schema (`StructType`). For a struct field A from `this` and a struct field
948+
* B from `that`,
949+
*
950+
* 1. If A and B have the same name and data type, they are merged to a field C with the same name
951+
* and data type. C is nullable if and only if either A or B is nullable.
952+
* 2. If A doesn't exist in `that`, it's included in the result schema.
953+
* 3. If B doesn't exist in `this`, it's also included in the result schema.
954+
* 4. Otherwise, `this` and `that` are considered as conflicting schemas and an exception would be
955+
* thrown.
956+
*/
957+
private[sql] def merge(that: StructType): StructType =
958+
StructType.merge(this, that).asInstanceOf[StructType]
893959
}
894960

895961

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 2 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
284284
ctype: DataType,
285285
name: String,
286286
nullable: Boolean = true,
287-
inArray: Boolean = false,
287+
inArray: Boolean = false,
288288
toThriftSchemaNames: Boolean = false): ParquetType = {
289289
val repetition =
290290
if (inArray) {
@@ -339,7 +339,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
339339
}
340340
case StructType(structFields) => {
341341
val fields = structFields.map {
342-
field => fromDataType(field.dataType, field.name, field.nullable,
342+
field => fromDataType(field.dataType, field.name, field.nullable,
343343
inArray = false, toThriftSchemaNames)
344344
}
345345
new ParquetGroupType(repetition, name, fields.toSeq)
@@ -522,58 +522,4 @@ private[parquet] object ParquetTypesConverter extends Logging {
522522
attributes
523523
}
524524
}
525-
526-
def mergeCatalystSchemas(left: StructType, right: StructType): StructType =
527-
mergeCatalystDataTypes(left, right).asInstanceOf[StructType]
528-
529-
def mergeCatalystDataTypes(left: DataType, right: DataType): DataType =
530-
(left, right) match {
531-
case (ArrayType(leftElementType, leftContainsNull),
532-
ArrayType(rightElementType, rightContainsNull)) =>
533-
ArrayType(
534-
mergeCatalystDataTypes(leftElementType, rightElementType),
535-
leftContainsNull || rightContainsNull)
536-
537-
case (MapType(leftKeyType, leftValueType, leftContainsNull),
538-
MapType(rightKeyType, rightValueType, rightContainsNull)) =>
539-
MapType(
540-
mergeCatalystDataTypes(leftKeyType, rightKeyType),
541-
mergeCatalystDataTypes(leftValueType, rightValueType),
542-
leftContainsNull || rightContainsNull)
543-
544-
case (StructType(leftFields), StructType(rightFields)) =>
545-
val newFields = ArrayBuffer.empty[StructField]
546-
547-
leftFields.foreach {
548-
case leftField @ StructField(leftName, leftType, leftNullable, _) =>
549-
rightFields
550-
.find(_.name == leftName)
551-
.map { case rightField @ StructField(_, rightType, rightNullable, _) =>
552-
leftField.copy(
553-
dataType = mergeCatalystDataTypes(leftType, rightType),
554-
nullable = leftNullable || rightNullable)
555-
}
556-
.orElse(Some(leftField))
557-
.foreach(newFields += _)
558-
}
559-
560-
rightFields
561-
.filterNot(f => leftFields.map(_.name).contains(f.name))
562-
.foreach(newFields += _)
563-
564-
StructType(newFields)
565-
566-
case (DecimalType.Fixed(leftPrecision, leftScale),
567-
DecimalType.Fixed(rightPrecision, rightScale)) =>
568-
DecimalType(leftPrecision.max(rightPrecision), leftScale.max(rightScale))
569-
570-
case (leftUdt: UserDefinedType[_], rightUdt: UserDefinedType[_])
571-
if leftUdt.userClass == rightUdt.userClass => leftUdt
572-
573-
case (leftType, rightType) if leftType == rightType =>
574-
leftType
575-
576-
case _ =>
577-
throw new SparkException(s"Failed to merge incompatible data types $left and $right")
578-
}
579525
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import org.apache.spark.sql.catalyst.expressions._
4545
import org.apache.spark.sql.parquet.ParquetTypesConverter._
4646
import org.apache.spark.sql.sources._
4747
import org.apache.spark.sql.types.{IntegerType, StructField, StructType, _}
48+
import org.apache.spark.sql.types.StructType._
4849
import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext}
4950
import org.apache.spark.{Partition => SparkPartition, TaskContext, SerializableWritable, Logging, SparkException}
5051

@@ -173,7 +174,7 @@ case class ParquetRelation2
173174
val statuses = paths.distinct.map(p => fs.getFileStatus(fs.makeQualified(new Path(p))))
174175
// Support either reading a collection of raw Parquet part-files, or a collection of folders
175176
// containing Parquet files (e.g. partitioned Parquet table).
176-
assert(statuses.forall(_.isFile) || statuses.forall(_.isDir))
177+
assert(statuses.forall(!_.isDir) || statuses.forall(_.isDir))
177178
statuses.toArray
178179
}
179180

@@ -252,11 +253,18 @@ case class ParquetRelation2
252253
// we can't trust the summary files if users require a merged schema, and must touch all part-
253254
// files to do the merge.
254255
if (shouldMergeSchemas) {
255-
dataStatuses.toSeq
256+
// Also includes summary files, 'cause there might be empty partition directories.
257+
(metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
256258
} else {
259+
// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
260+
// don't have this.
257261
commonMetadataStatuses.headOption
262+
// Falls back to "_metadata"
258263
.orElse(metadataStatuses.headOption)
259-
// Summary file(s) not found, falls back to the first part-file.
264+
// Summary file(s) not found, the Parquet file is either corrupted, or different part-
265+
// files contain conflicting user defined metadata (two or more values are associated
266+
// with a same key in different files). In either case, we fall back to any of the
267+
// first part-file, and just assume all schemas are consistent.
260268
.orElse(dataStatuses.headOption)
261269
.toSeq
262270
}
@@ -507,14 +515,17 @@ case class ParquetRelation2
507515

508516
object ParquetRelation2 {
509517
// Whether we should merge schemas collected from all Parquet part-files.
510-
val MERGE_SCHEMA = "parquet.mergeSchema"
518+
val MERGE_SCHEMA = "mergeSchema"
511519

512520
// Hive Metastore schema, passed in when the Parquet relation is converted from Metastore
513-
val METASTORE_SCHEMA = "parquet.metastoreSchema"
521+
val METASTORE_SCHEMA = "metastoreSchema"
514522

515523
// Default partition name to use when the partition column value is null or empty string
516524
val DEFAULT_PARTITION_NAME = "partition.defaultName"
517525

526+
// When true, the Parquet data source caches Parquet metadata for performance
527+
val CACHE_METADATA = "cacheMetadata"
528+
518529
private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
519530
footers.map { footer =>
520531
val metadata = footer.getParquetMetadata.getFileMetaData
@@ -535,7 +546,7 @@ object ParquetRelation2 {
535546
sqlContext.conf.isParquetINT96AsTimestamp))
536547
}
537548
}.reduce { (left, right) =>
538-
try mergeCatalystSchemas(left, right) catch { case e: Throwable =>
549+
try left.merge(right) catch { case e: Throwable =>
539550
throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
540551
}
541552
}
@@ -637,14 +648,15 @@ object ParquetRelation2 {
637648
path: Path,
638649
defaultPartitionName: String): PartitionValues = {
639650
val columns = ArrayBuffer.empty[(String, Literal)]
640-
var finished = path.isRoot
651+
// Old Hadoop versions don't have `Path.isRoot`
652+
var finished = path.getParent == null
641653
var chopped = path
642654

643655
while (!finished) {
644656
val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName)
645657
maybeColumn.foreach(columns += _)
646658
chopped = chopped.getParent
647-
finished = maybeColumn.isEmpty || chopped.isRoot
659+
finished = maybeColumn.isEmpty || chopped.getParent == null
648660
}
649661

650662
val (columnNames, values) = columns.reverse.unzip

0 commit comments

Comments
 (0)