Skip to content

Commit 5654c9d

Browse files
committed
Draft version of Parquet partition discovery and schema merging
1 parent 4d8d070 commit 5654c9d

File tree

3 files changed

+373
-138
lines changed

3 files changed

+373
-138
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.lang.reflect.Method
2121
import java.security.PrivilegedExceptionAction
2222

2323
import org.apache.hadoop.conf.Configuration
24-
import org.apache.hadoop.fs.{FileSystem, Path}
24+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2525
import org.apache.hadoop.fs.FileSystem.Statistics
2626
import org.apache.hadoop.mapred.JobConf
2727
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
@@ -191,6 +191,21 @@ class SparkHadoopUtil extends Logging {
191191
val method = context.getClass.getMethod("getConfiguration")
192192
method.invoke(context).asInstanceOf[Configuration]
193193
}
194+
195+
/**
196+
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
197+
* given path points to a file, return a single-element collection containing [[FileStatus]] of
198+
* that file.
199+
*/
200+
def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
201+
def recurse(path: Path) = {
202+
val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
203+
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
204+
}
205+
206+
val baseStatus = fs.getFileStatus(basePath)
207+
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
208+
}
194209
}
195210

196211
object SparkHadoopUtil {

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,23 @@ package org.apache.spark.sql.parquet
1919

2020
import java.io.IOException
2121

22+
import scala.collection.mutable.ArrayBuffer
2223
import scala.util.Try
2324

2425
import org.apache.hadoop.conf.Configuration
2526
import org.apache.hadoop.fs.{FileSystem, Path}
2627
import org.apache.hadoop.mapreduce.Job
27-
2828
import parquet.format.converter.ParquetMetadataConverter
29-
import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter}
30-
import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData}
29+
import parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
3130
import parquet.hadoop.util.ContextUtil
32-
import parquet.schema.{Type => ParquetType, Types => ParquetTypes, PrimitiveType => ParquetPrimitiveType, MessageType}
33-
import parquet.schema.{GroupType => ParquetGroupType, OriginalType => ParquetOriginalType, ConversionPatterns, DecimalMetadata}
31+
import parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
3432
import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
3533
import parquet.schema.Type.Repetition
34+
import parquet.schema.{ConversionPatterns, DecimalMetadata, GroupType => ParquetGroupType, MessageType, OriginalType => ParquetOriginalType, PrimitiveType => ParquetPrimitiveType, Type => ParquetType, Types => ParquetTypes}
3635

37-
import org.apache.spark.Logging
38-
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
36+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
3937
import org.apache.spark.sql.types._
38+
import org.apache.spark.{Logging, SparkException}
4039

4140
// Implicits
4241
import scala.collection.JavaConversions._
@@ -523,4 +522,55 @@ private[parquet] object ParquetTypesConverter extends Logging {
523522
attributes
524523
}
525524
}
525+
526+
def mergeCatalystSchemas(left: StructType, right: StructType): StructType =
527+
mergeCatalystDataTypes(left, right).asInstanceOf[StructType]
528+
529+
def mergeCatalystDataTypes(left: DataType, right: DataType): DataType =
530+
(left, right) match {
531+
case (ArrayType(leftElementType, leftContainsNull),
532+
ArrayType(rightElementType, rightContainsNull)) =>
533+
ArrayType(
534+
mergeCatalystDataTypes(leftElementType, rightElementType),
535+
leftContainsNull || rightContainsNull)
536+
537+
case (MapType(leftKeyType, leftValueType, leftContainsNull),
538+
MapType(rightKeyType, rightValueType, rightContainsNull)) =>
539+
MapType(
540+
mergeCatalystDataTypes(leftKeyType, rightKeyType),
541+
mergeCatalystDataTypes(leftValueType, rightValueType),
542+
leftContainsNull || rightContainsNull)
543+
544+
case (StructType(leftFields), StructType(rightFields)) =>
545+
val newFields = ArrayBuffer.empty[StructField]
546+
547+
leftFields.foreach {
548+
case leftField @ StructField(leftName, leftType, leftNullable, leftMetadata) =>
549+
rightFields
550+
.find(_.name == leftName)
551+
.map { case rightField @ StructField(_, rightType, rightNullable, rightMeatadata) =>
552+
leftField.copy(
553+
dataType = mergeCatalystDataTypes(leftType, rightType),
554+
nullable = leftNullable || rightNullable)
555+
}
556+
.orElse(Some(leftField))
557+
.foreach(newFields += _)
558+
}
559+
560+
rightFields
561+
.filterNot(f => leftFields.map(_.name).contains(f.name))
562+
.foreach(newFields += _)
563+
564+
StructType(newFields)
565+
566+
case (DecimalType.Fixed(leftPrecision, leftScale),
567+
DecimalType.Fixed(rightPrecision, rightScale)) =>
568+
DecimalType(leftPrecision.max(rightPrecision), leftScale.max(rightScale))
569+
570+
case (leftType, rightType) if leftType == rightType =>
571+
leftType
572+
573+
case _ =>
574+
throw new SparkException(s"Failed to merge incompatible data types $left and $right")
575+
}
526576
}

0 commit comments

Comments
 (0)