Skip to content

Commit c025c3d

Browse files
lianchengrxin
authored andcommitted
[SPARK-9095] [SQL] Removes the old Parquet support
This PR removes the old Parquet support: - Removes the old `ParquetRelation` together with related SQL configuration, plan nodes, strategies, utility classes, and test suites. - Renames `ParquetRelation2` to `ParquetRelation` - Renames `RowReadSupport` and `RowRecordMaterializer` to `CatalystReadSupport` and `CatalystRecordMaterializer` respectively, and moved them to separate files. This follows naming convention used in other Parquet data models implemented in parquet-mr. It should be easier for developers who are familiar with Parquet to follow. There's still some other code that can be cleaned up. Especially `RowWriteSupport`. But I'd like to leave this part to SPARK-8848. Author: Cheng Lian <[email protected]> Closes #7441 from liancheng/spark-9095 and squashes the following commits: c7b6e38 [Cheng Lian] Removes WriteToFile 2d688d6 [Cheng Lian] Renames ParquetRelation2 to ParquetRelation ca9e1b7 [Cheng Lian] Removes old Parquet support
1 parent 6b2baec commit c025c3d

27 files changed

+1037
-2152
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,12 +186,6 @@ case class WithWindowDefinition(
186186
override def output: Seq[Attribute] = child.output
187187
}
188188

189-
case class WriteToFile(
190-
path: String,
191-
child: LogicalPlan) extends UnaryNode {
192-
override def output: Seq[Attribute] = child.output
193-
}
194-
195189
/**
196190
* @param order The ordering expressions
197191
* @param global True means global sorting apply for entire data set,

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,7 @@ class DataFrame private[sql](
139139
// happen right away to let these side effects take place eagerly.
140140
case _: Command |
141141
_: InsertIntoTable |
142-
_: CreateTableUsingAsSelect |
143-
_: WriteToFile =>
142+
_: CreateTableUsingAsSelect =>
144143
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
145144
case _ =>
146145
queryExecution.analyzed
@@ -1615,11 +1614,7 @@ class DataFrame private[sql](
16151614
*/
16161615
@deprecated("Use write.parquet(path)", "1.4.0")
16171616
def saveAsParquetFile(path: String): Unit = {
1618-
if (sqlContext.conf.parquetUseDataSourceApi) {
1619-
write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
1620-
} else {
1621-
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
1622-
}
1617+
write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
16231618
}
16241619

16251620
/**

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@ import java.util.Properties
2121

2222
import org.apache.hadoop.fs.Path
2323

24-
import org.apache.spark.{Logging, Partition}
2524
import org.apache.spark.annotation.Experimental
2625
import org.apache.spark.api.java.JavaRDD
2726
import org.apache.spark.deploy.SparkHadoopUtil
2827
import org.apache.spark.rdd.RDD
29-
import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
28+
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
3029
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
3130
import org.apache.spark.sql.json.JSONRelation
32-
import org.apache.spark.sql.parquet.ParquetRelation2
31+
import org.apache.spark.sql.parquet.ParquetRelation
3332
import org.apache.spark.sql.types.StructType
33+
import org.apache.spark.{Logging, Partition}
3434

3535
/**
3636
* :: Experimental ::
@@ -259,7 +259,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
259259
}.toArray
260260

261261
sqlContext.baseRelationToDataFrame(
262-
new ParquetRelation2(
262+
new ParquetRelation(
263263
globbedPaths.map(_.toString), None, None, extraOptions.toMap)(sqlContext))
264264
}
265265
}

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -276,10 +276,6 @@ private[spark] object SQLConf {
276276
defaultValue = Some(true),
277277
doc = "Enables Parquet filter push-down optimization when set to true.")
278278

279-
val PARQUET_USE_DATA_SOURCE_API = booleanConf("spark.sql.parquet.useDataSourceApi",
280-
defaultValue = Some(true),
281-
doc = "<TODO>")
282-
283279
val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf(
284280
key = "spark.sql.parquet.followParquetFormatSpec",
285281
defaultValue = Some(false),
@@ -456,8 +452,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
456452

457453
private[spark] def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
458454

459-
private[spark] def parquetUseDataSourceApi: Boolean = getConf(PARQUET_USE_DATA_SOURCE_API)
460-
461455
private[spark] def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
462456

463457
private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -870,7 +870,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
870870
LeftSemiJoin ::
871871
HashJoin ::
872872
InMemoryScans ::
873-
ParquetOperations ::
874873
BasicOperators ::
875874
CartesianProduct ::
876875
BroadcastNestedLoopJoin :: Nil)
@@ -1115,11 +1114,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
11151114
def parquetFile(paths: String*): DataFrame = {
11161115
if (paths.isEmpty) {
11171116
emptyDataFrame
1118-
} else if (conf.parquetUseDataSourceApi) {
1119-
read.parquet(paths : _*)
11201117
} else {
1121-
DataFrame(this, parquet.ParquetRelation(
1122-
paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
1118+
read.parquet(paths : _*)
11231119
}
11241120
}
11251121

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 3 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,18 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
import org.apache.spark.sql.{SQLContext, Strategy, execution}
2120
import org.apache.spark.sql.catalyst.InternalRow
2221
import org.apache.spark.sql.catalyst.expressions._
23-
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression2}
22+
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2
2423
import org.apache.spark.sql.catalyst.planning._
2524
import org.apache.spark.sql.catalyst.plans._
2625
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
2726
import org.apache.spark.sql.catalyst.plans.physical._
2827
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
29-
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
3028
import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
31-
import org.apache.spark.sql.parquet._
29+
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
3230
import org.apache.spark.sql.types._
31+
import org.apache.spark.sql.{SQLContext, Strategy, execution}
3332

3433
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
3534
self: SQLContext#SparkPlanner =>
@@ -306,57 +305,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
306305
}
307306
}
308307

309-
object ParquetOperations extends Strategy {
310-
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
311-
// TODO: need to support writing to other types of files. Unify the below code paths.
312-
case logical.WriteToFile(path, child) =>
313-
val relation =
314-
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
315-
// Note: overwrite=false because otherwise the metadata we just created will be deleted
316-
InsertIntoParquetTable(relation, planLater(child), overwrite = false) :: Nil
317-
case logical.InsertIntoTable(
318-
table: ParquetRelation, partition, child, overwrite, ifNotExists) =>
319-
InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
320-
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
321-
val partitionColNames = relation.partitioningAttributes.map(_.name).toSet
322-
val filtersToPush = filters.filter { pred =>
323-
val referencedColNames = pred.references.map(_.name).toSet
324-
referencedColNames.intersect(partitionColNames).isEmpty
325-
}
326-
val prunePushedDownFilters =
327-
if (sqlContext.conf.parquetFilterPushDown) {
328-
(predicates: Seq[Expression]) => {
329-
// Note: filters cannot be pushed down to Parquet if they contain more complex
330-
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove all
331-
// filters that have been pushed down. Note that a predicate such as "(A AND B) OR C"
332-
// can result in "A OR C" being pushed down. Here we are conservative in the sense
333-
// that even if "A" was pushed and we check for "A AND B" we still want to keep
334-
// "A AND B" in the higher-level filter, not just "B".
335-
predicates.map(p => p -> ParquetFilters.createFilter(p)).collect {
336-
case (predicate, None) => predicate
337-
// Filter needs to be applied above when it contains partitioning
338-
// columns
339-
case (predicate, _)
340-
if !predicate.references.map(_.name).toSet.intersect(partitionColNames).isEmpty =>
341-
predicate
342-
}
343-
}
344-
} else {
345-
identity[Seq[Expression]] _
346-
}
347-
pruneFilterProject(
348-
projectList,
349-
filters,
350-
prunePushedDownFilters,
351-
ParquetTableScan(
352-
_,
353-
relation,
354-
if (sqlContext.conf.parquetFilterPushDown) filtersToPush else Nil)) :: Nil
355-
356-
case _ => Nil
357-
}
358-
}
359-
360308
object InMemoryScans extends Strategy {
361309
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
362310
case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.parquet
19+
20+
import java.util.{Map => JMap}
21+
22+
import scala.collection.JavaConversions.{iterableAsScalaIterable, mapAsJavaMap, mapAsScalaMap}
23+
24+
import org.apache.hadoop.conf.Configuration
25+
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
26+
import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
27+
import org.apache.parquet.io.api.RecordMaterializer
28+
import org.apache.parquet.schema.MessageType
29+
30+
import org.apache.spark.Logging
31+
import org.apache.spark.sql.catalyst.InternalRow
32+
import org.apache.spark.sql.types.StructType
33+
34+
private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
35+
override def prepareForRead(
36+
conf: Configuration,
37+
keyValueMetaData: JMap[String, String],
38+
fileSchema: MessageType,
39+
readContext: ReadContext): RecordMaterializer[InternalRow] = {
40+
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")
41+
42+
val toCatalyst = new CatalystSchemaConverter(conf)
43+
val parquetRequestedSchema = readContext.getRequestedSchema
44+
45+
val catalystRequestedSchema =
46+
Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata =>
47+
metadata
48+
// First tries to read requested schema, which may result from projections
49+
.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
50+
// If not available, tries to read Catalyst schema from file metadata. It's only
51+
// available if the target file is written by Spark SQL.
52+
.orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
53+
}.map(StructType.fromString).getOrElse {
54+
logDebug("Catalyst schema not available, falling back to Parquet schema")
55+
toCatalyst.convert(parquetRequestedSchema)
56+
}
57+
58+
logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema")
59+
new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
60+
}
61+
62+
override def init(context: InitContext): ReadContext = {
63+
val conf = context.getConfiguration
64+
65+
// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
66+
// schema of this file from its the metadata.
67+
val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
68+
69+
// Optional schema of requested columns, in the form of a string serialized from a Catalyst
70+
// `StructType` containing all requested columns.
71+
val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
72+
73+
// Below we construct a Parquet schema containing all requested columns. This schema tells
74+
// Parquet which columns to read.
75+
//
76+
// If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise,
77+
// we have to fallback to the full file schema which contains all columns in the file.
78+
// Obviously this may waste IO bandwidth since it may read more columns than requested.
79+
//
80+
// Two things to note:
81+
//
82+
// 1. It's possible that some requested columns don't exist in the target Parquet file. For
83+
// example, in the case of schema merging, the globally merged schema may contain extra
84+
// columns gathered from other Parquet files. These columns will be simply filled with nulls
85+
// when actually reading the target Parquet file.
86+
//
87+
// 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to
88+
// Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to
89+
// non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file
90+
// containing a single integer array field `f1` may have the following legacy 2-level
91+
// structure:
92+
//
93+
// message root {
94+
// optional group f1 (LIST) {
95+
// required INT32 element;
96+
// }
97+
// }
98+
//
99+
// while `CatalystSchemaConverter` may generate a standard 3-level structure:
100+
//
101+
// message root {
102+
// optional group f1 (LIST) {
103+
// repeated group list {
104+
// required INT32 element;
105+
// }
106+
// }
107+
// }
108+
//
109+
// Apparently, we can't use the 2nd schema to read the target Parquet file as they have
110+
// different physical structures.
111+
val parquetRequestedSchema =
112+
maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
113+
val toParquet = new CatalystSchemaConverter(conf)
114+
val fileSchema = context.getFileSchema.asGroupType()
115+
val fileFieldNames = fileSchema.getFields.map(_.getName).toSet
116+
117+
StructType
118+
// Deserializes the Catalyst schema of requested columns
119+
.fromString(schemaString)
120+
.map { field =>
121+
if (fileFieldNames.contains(field.name)) {
122+
// If the field exists in the target Parquet file, extracts the field type from the
123+
// full file schema and makes a single-field Parquet schema
124+
new MessageType("root", fileSchema.getType(field.name))
125+
} else {
126+
// Otherwise, just resorts to `CatalystSchemaConverter`
127+
toParquet.convert(StructType(Array(field)))
128+
}
129+
}
130+
// Merges all single-field Parquet schemas to form a complete schema for all requested
131+
// columns. Note that it's possible that no columns are requested at all (e.g., count
132+
// some partition column of a partitioned Parquet table). That's why `fold` is used here
133+
// and always fallback to an empty Parquet schema.
134+
.fold(new MessageType("root")) {
135+
_ union _
136+
}
137+
}
138+
139+
val metadata =
140+
Map.empty[String, String] ++
141+
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
142+
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
143+
144+
logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema")
145+
new ReadContext(parquetRequestedSchema, metadata)
146+
}
147+
}
148+
149+
private[parquet] object CatalystReadSupport {
150+
val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
151+
152+
val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
153+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.parquet
19+
20+
import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
21+
import org.apache.parquet.schema.MessageType
22+
23+
import org.apache.spark.sql.catalyst.InternalRow
24+
import org.apache.spark.sql.types.StructType
25+
26+
/**
27+
* A [[RecordMaterializer]] for Catalyst rows.
28+
*
29+
* @param parquetSchema Parquet schema of the records to be read
30+
* @param catalystSchema Catalyst schema of the rows to be constructed
31+
*/
32+
private[parquet] class CatalystRecordMaterializer(
33+
parquetSchema: MessageType, catalystSchema: StructType)
34+
extends RecordMaterializer[InternalRow] {
35+
36+
private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater)
37+
38+
override def getCurrentRecord: InternalRow = rootConverter.currentRow
39+
40+
override def getRootConverter: GroupConverter = rootConverter
41+
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,11 @@ private[parquet] object CatalystSchemaConverter {
570570
""".stripMargin.split("\n").mkString(" "))
571571
}
572572

573+
def checkFieldNames(schema: StructType): StructType = {
574+
schema.fieldNames.foreach(checkFieldName)
575+
schema
576+
}
577+
573578
def analysisRequire(f: => Boolean, message: String): Unit = {
574579
if (!f) {
575580
throw new AnalysisException(message)

0 commit comments

Comments
 (0)