Skip to content

Commit a9ed511

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-3575] [SQL] Parquet data source improvements
This PR adds three major improvements to Parquet data source: 1. Partition discovery While reading Parquet files resides in Hive style partition directories, `ParquetRelation2` automatically discovers partitioning information and infers partition column types. This is also a partial work for [SPARK-5182] [1], which aims to provide first class partitioning support for the data source API. Related code in this PR can be easily extracted to the data source API level in future versions. 1. Schema merging When enabled, Parquet data source collects schema information from all Parquet part-files and tries to merge them. Exceptions are thrown when incompatible schemas are detected. This feature is controlled by data source option `parquet.mergeSchema`, and is enabled by default. 1. Metastore Parquet table conversion moved to analysis phase This greatly simplifies the conversion logic. `ParquetConversion` strategy can be removed once the old Parquet implementation is removed in the future. This version of Parquet data source aims to entirely replace the old Parquet implementation. However, the old version hasn't been removed yet. Users can fall back to the old version by turning off SQL configuration `spark.sql.parquet.useDataSourceApi`. Other JIRA tickets fixed as side effects in this PR: - [SPARK-5509] [3]: `EqualTo` now uses a proper `Ordering` to compare binary types. - [SPARK-3575] [4]: Metastore schema is now preserved and passed to `ParquetRelation2` via data source option `parquet.metastoreSchema`. TODO: - [ ] More test cases for partition discovery - [x] Fix write path after data source write support (apache#4294) is merged It turned out to be non-trivial to fall back to old Parquet implementation on the write path when Parquet data source is enabled. Since we're planning to include data source write support in 1.3.0, I simply ignored two test cases involving Parquet insertion for now. - [ ] Fix outdated comments and documentations PS: This PR looks big, but more than a half of the changed lines in this PR are trivial changes to test cases. To test Parquet with and without the new data source, almost all Parquet test cases are moved into wrapper driver functions. This introduces hundreds of lines of changes. [1]: https://issues.apache.org/jira/browse/SPARK-5182 [2]: https://issues.apache.org/jira/browse/SPARK-5528 [3]: https://issues.apache.org/jira/browse/SPARK-5509 [4]: https://issues.apache.org/jira/browse/SPARK-3575 <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4308) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes apache#4308 from liancheng/parquet-partition-discovery and squashes the following commits: b6946e6 [Cheng Lian] Fixes MiMA issues, addresses comments 8232e17 [Cheng Lian] Write support for Parquet data source a49bd28 [Cheng Lian] Fixes spelling typo in trait name "CreateableRelationProvider" 808380f [Cheng Lian] Fixes issues introduced while rebasing 50dd8d1 [Cheng Lian] Addresses @rxin's comment, fixes UDT schema merging adf2aae [Cheng Lian] Fixes compilation error introduced while rebasing 4e0175f [Cheng Lian] Fixes Python Parquet API, we need Py4J array to call varargs method 0d8ec1d [Cheng Lian] Adds more test cases b35c8c6 [Cheng Lian] Fixes some typos and outdated comments dd704fd [Cheng Lian] Fixes Python Parquet API 596c312 [Cheng Lian] Uses switch to control whether use Parquet data source or not 7d0f7a2 [Cheng Lian] Fixes Metastore Parquet table conversion a1896c7 [Cheng Lian] Fixes all existing Parquet test suites except for ParquetMetastoreSuite 5654c9d [Cheng Lian] Draft version of Parquet partition discovery and schema merging
1 parent c19152c commit a9ed511

File tree

24 files changed

+1541
-736
lines changed

24 files changed

+1541
-736
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.lang.reflect.Method
2121
import java.security.PrivilegedExceptionAction
2222

2323
import org.apache.hadoop.conf.Configuration
24-
import org.apache.hadoop.fs.{FileSystem, Path}
24+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2525
import org.apache.hadoop.fs.FileSystem.Statistics
2626
import org.apache.hadoop.mapred.JobConf
2727
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
@@ -191,6 +191,21 @@ class SparkHadoopUtil extends Logging {
191191
val method = context.getClass.getMethod("getConfiguration")
192192
method.invoke(context).asInstanceOf[Configuration]
193193
}
194+
195+
/**
196+
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
197+
* given path points to a file, return a single-element collection containing [[FileStatus]] of
198+
* that file.
199+
*/
200+
def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
201+
def recurse(path: Path) = {
202+
val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
203+
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
204+
}
205+
206+
val baseStatus = fs.getFileStatus(basePath)
207+
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
208+
}
194209
}
195210

196211
object SparkHadoopUtil {

python/pyspark/sql.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1471,7 +1471,7 @@ def registerRDDAsTable(self, rdd, tableName):
14711471
else:
14721472
raise ValueError("Can only register DataFrame as table")
14731473

1474-
def parquetFile(self, path):
1474+
def parquetFile(self, *paths):
14751475
"""Loads a Parquet file, returning the result as a L{DataFrame}.
14761476
14771477
>>> import tempfile, shutil
@@ -1483,7 +1483,12 @@ def parquetFile(self, path):
14831483
>>> sorted(df.collect()) == sorted(df2.collect())
14841484
True
14851485
"""
1486-
jdf = self._ssql_ctx.parquetFile(path)
1486+
gateway = self._sc._gateway
1487+
jpath = paths[0]
1488+
jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths) - 1)
1489+
for i in range(1, len(paths)):
1490+
jpaths[i] = paths[i]
1491+
jdf = self._ssql_ctx.parquetFile(jpath, jpaths)
14871492
return DataFrame(jdf, self)
14881493

14891494
def jsonFile(self, path, schema=None, samplingRatio=1.0):

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
1919

2020
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
2121
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22-
import org.apache.spark.sql.types.BooleanType
22+
import org.apache.spark.sql.types.{BinaryType, BooleanType}
2323

2424
object InterpretedPredicate {
2525
def apply(expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
@@ -175,7 +175,10 @@ case class EqualTo(left: Expression, right: Expression) extends BinaryComparison
175175
null
176176
} else {
177177
val r = right.eval(input)
178-
if (r == null) null else l == r
178+
if (r == null) null
179+
else if (left.dataType != BinaryType) l == r
180+
else BinaryType.ordering.compare(
181+
l.asInstanceOf[Array[Byte]], r.asInstanceOf[Array[Byte]]) == 0
179182
}
180183
}
181184
}

sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.types
1919

2020
import java.sql.Timestamp
2121

22+
import scala.collection.mutable.ArrayBuffer
2223
import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral}
2324
import scala.reflect.ClassTag
2425
import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag}
@@ -29,6 +30,7 @@ import org.json4s.JsonAST.JValue
2930
import org.json4s.JsonDSL._
3031
import org.json4s.jackson.JsonMethods._
3132

33+
import org.apache.spark.SparkException
3234
import org.apache.spark.annotation.DeveloperApi
3335
import org.apache.spark.sql.catalyst.ScalaReflectionLock
3436
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
@@ -159,7 +161,6 @@ object DataType {
159161
case failure: NoSuccess =>
160162
throw new IllegalArgumentException(s"Unsupported dataType: $asString, $failure")
161163
}
162-
163164
}
164165

165166
protected[types] def buildFormattedString(
@@ -754,6 +755,57 @@ object StructType {
754755
def apply(fields: java.util.List[StructField]): StructType = {
755756
StructType(fields.toArray.asInstanceOf[Array[StructField]])
756757
}
758+
759+
private[sql] def merge(left: DataType, right: DataType): DataType =
760+
(left, right) match {
761+
case (ArrayType(leftElementType, leftContainsNull),
762+
ArrayType(rightElementType, rightContainsNull)) =>
763+
ArrayType(
764+
merge(leftElementType, rightElementType),
765+
leftContainsNull || rightContainsNull)
766+
767+
case (MapType(leftKeyType, leftValueType, leftContainsNull),
768+
MapType(rightKeyType, rightValueType, rightContainsNull)) =>
769+
MapType(
770+
merge(leftKeyType, rightKeyType),
771+
merge(leftValueType, rightValueType),
772+
leftContainsNull || rightContainsNull)
773+
774+
case (StructType(leftFields), StructType(rightFields)) =>
775+
val newFields = ArrayBuffer.empty[StructField]
776+
777+
leftFields.foreach {
778+
case leftField @ StructField(leftName, leftType, leftNullable, _) =>
779+
rightFields
780+
.find(_.name == leftName)
781+
.map { case rightField @ StructField(_, rightType, rightNullable, _) =>
782+
leftField.copy(
783+
dataType = merge(leftType, rightType),
784+
nullable = leftNullable || rightNullable)
785+
}
786+
.orElse(Some(leftField))
787+
.foreach(newFields += _)
788+
}
789+
790+
rightFields
791+
.filterNot(f => leftFields.map(_.name).contains(f.name))
792+
.foreach(newFields += _)
793+
794+
StructType(newFields)
795+
796+
case (DecimalType.Fixed(leftPrecision, leftScale),
797+
DecimalType.Fixed(rightPrecision, rightScale)) =>
798+
DecimalType(leftPrecision.max(rightPrecision), leftScale.max(rightScale))
799+
800+
case (leftUdt: UserDefinedType[_], rightUdt: UserDefinedType[_])
801+
if leftUdt.userClass == rightUdt.userClass => leftUdt
802+
803+
case (leftType, rightType) if leftType == rightType =>
804+
leftType
805+
806+
case _ =>
807+
throw new SparkException(s"Failed to merge incompatible data types $left and $right")
808+
}
757809
}
758810

759811

@@ -890,6 +942,20 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
890942
val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
891943
s"struct<${fieldTypes.mkString(",")}>"
892944
}
945+
946+
/**
947+
* Merges with another schema (`StructType`). For a struct field A from `this` and a struct field
948+
* B from `that`,
949+
*
950+
* 1. If A and B have the same name and data type, they are merged to a field C with the same name
951+
* and data type. C is nullable if and only if either A or B is nullable.
952+
* 2. If A doesn't exist in `that`, it's included in the result schema.
953+
* 3. If B doesn't exist in `this`, it's also included in the result schema.
954+
* 4. Otherwise, `this` and `that` are considered as conflicting schemas and an exception would be
955+
* thrown.
956+
*/
957+
private[sql] def merge(that: StructType): StructType =
958+
StructType.merge(this, that).asInstanceOf[StructType]
893959
}
894960

895961

sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,11 @@ private[sql] class DataFrameImpl protected[sql](
295295
}
296296

297297
override def saveAsParquetFile(path: String): Unit = {
298-
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
298+
if (sqlContext.conf.parquetUseDataSourceApi) {
299+
save("org.apache.spark.sql.parquet", "path" -> path)
300+
} else {
301+
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
302+
}
299303
}
300304

301305
override def saveAsTable(tableName: String): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ private[spark] object SQLConf {
3737
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
3838
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
3939
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
40+
val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"
4041

4142
val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
4243
val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout"
@@ -105,6 +106,10 @@ private[sql] class SQLConf extends Serializable {
105106
private[spark] def parquetFilterPushDown =
106107
getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
107108

109+
/** When true uses Parquet implementation based on data source API */
110+
private[spark] def parquetUseDataSourceApi =
111+
getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean
112+
108113
/** When true the planner will use the external sort, which may spill to disk. */
109114
private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean
110115

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,13 @@ package org.apache.spark.sql
2020
import java.beans.Introspector
2121
import java.util.Properties
2222

23-
import scala.collection.immutable
2423
import scala.collection.JavaConversions._
24+
import scala.collection.immutable
2525
import scala.language.implicitConversions
2626
import scala.reflect.runtime.universe.TypeTag
2727

28-
import org.apache.spark.{SparkContext, Partition}
2928
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
30-
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
29+
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
3130
import org.apache.spark.rdd.RDD
3231
import org.apache.spark.sql.catalyst.ScalaReflection
3332
import org.apache.spark.sql.catalyst.analysis._
@@ -36,11 +35,12 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
3635
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
3736
import org.apache.spark.sql.catalyst.rules.RuleExecutor
3837
import org.apache.spark.sql.execution._
39-
import org.apache.spark.sql.json._
4038
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
41-
import org.apache.spark.sql.sources._
39+
import org.apache.spark.sql.json._
40+
import org.apache.spark.sql.sources.{BaseRelation, DDLParser, DataSourceStrategy, LogicalRelation, _}
4241
import org.apache.spark.sql.types._
4342
import org.apache.spark.util.Utils
43+
import org.apache.spark.{Partition, SparkContext}
4444

4545
/**
4646
* :: AlphaComponent ::
@@ -303,8 +303,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
303303
*
304304
* @group userf
305305
*/
306-
def parquetFile(path: String): DataFrame =
307-
DataFrame(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
306+
@scala.annotation.varargs
307+
def parquetFile(path: String, paths: String*): DataFrame =
308+
if (conf.parquetUseDataSourceApi) {
309+
baseRelationToDataFrame(parquet.ParquetRelation2(path +: paths, Map.empty)(this))
310+
} else {
311+
DataFrame(this, parquet.ParquetRelation(
312+
paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
313+
}
308314

309315
/**
310316
* Loads a JSON file (one object per line), returning the result as a [[DataFrame]].

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,17 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
import org.apache.spark.sql.{SQLContext, Strategy, execution}
2120
import org.apache.spark.sql.catalyst.expressions._
2221
import org.apache.spark.sql.catalyst.planning._
2322
import org.apache.spark.sql.catalyst.plans._
24-
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
23+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2524
import org.apache.spark.sql.catalyst.plans.physical._
2625
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
26+
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
2727
import org.apache.spark.sql.parquet._
28+
import org.apache.spark.sql.sources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
2829
import org.apache.spark.sql.types._
29-
import org.apache.spark.sql.sources.{DescribeCommand => LogicalDescribeCommand}
30-
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
31-
import org.apache.spark.sql.sources._
30+
import org.apache.spark.sql.{SQLContext, Strategy, execution}
3231

3332
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
3433
self: SQLContext#SparkPlanner =>

sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.sql.types.StructType
2727

2828

2929
private[sql] class DefaultSource
30-
extends RelationProvider with SchemaRelationProvider with CreateableRelationProvider {
30+
extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
3131

3232
/** Returns a new base relation with the parameters. */
3333
override def createRelation(

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
159159
val attributesSize = attributes.size
160160
if (attributesSize > record.size) {
161161
throw new IndexOutOfBoundsException(
162-
s"Trying to write more fields than contained in row (${attributesSize}>${record.size})")
162+
s"Trying to write more fields than contained in row ($attributesSize > ${record.size})")
163163
}
164164

165165
var index = 0
@@ -325,7 +325,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
325325
val attributesSize = attributes.size
326326
if (attributesSize > record.size) {
327327
throw new IndexOutOfBoundsException(
328-
s"Trying to write more fields than contained in row (${attributesSize}>${record.size})")
328+
s"Trying to write more fields than contained in row ($attributesSize > ${record.size})")
329329
}
330330

331331
var index = 0
@@ -348,10 +348,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
348348
index: Int): Unit = {
349349
ctype match {
350350
case StringType => writer.addBinary(
351-
Binary.fromByteArray(
352-
record(index).asInstanceOf[String].getBytes("utf-8")
353-
)
354-
)
351+
Binary.fromByteArray(record(index).asInstanceOf[String].getBytes("utf-8")))
355352
case BinaryType => writer.addBinary(
356353
Binary.fromByteArray(record(index).asInstanceOf[Array[Byte]]))
357354
case IntegerType => writer.addInteger(record.getInt(index))

0 commit comments

Comments
 (0)