Skip to content

Commit 57e97fc

Browse files
committed
[SPARK-18029][SQL] PruneFileSourcePartitions should not change the output of LogicalRelation
## What changes were proposed in this pull request? In `PruneFileSourcePartitions`, we will replace the `LogicalRelation` with a pruned one. However, this replacement may change the output of the `LogicalRelation` if it doesn't have `expectedOutputAttributes`. This PR fixes it. ## How was this patch tested? the new `PruneFileSourcePartitionsSuite` Author: Wenchen Fan <[email protected]> Closes #15569 from cloud-fan/partition-bug.
1 parent 3180272 commit 57e97fc

File tree

5 files changed

+85
-7
lines changed

5 files changed

+85
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ case class CatalogTablePartition(
102102
* Given the partition schema, returns a row with that schema holding the partition values.
103103
*/
104104
def toRow(partitionSchema: StructType): InternalRow = {
105-
InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) =>
106-
Cast(Literal(spec(name)), dataType).eval()
105+
InternalRow.fromSeq(partitionSchema.map { field =>
106+
Cast(Literal(spec(field.name)), field.dataType).eval()
107107
})
108108
}
109109
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
5959
val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
6060
val prunedFsRelation =
6161
fsRelation.copy(location = prunedFileCatalog)(sparkSession)
62-
val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
62+
val prunedLogicalRelation = logicalRelation.copy(
63+
relation = prunedFsRelation,
64+
expectedOutputAttributes = Some(logicalRelation.output))
6365

6466
// Keep partition-pruning predicates so that they are visible in physical planning
6567
val filterExpression = filters.reduceLeft(And)

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ package org.apache.spark.sql.hive
2020
import java.io.File
2121

2222
import org.apache.spark.metrics.source.HiveCatalogMetrics
23+
import org.apache.spark.sql.QueryTest
2324
import org.apache.spark.sql.hive.test.TestHiveSingleton
25+
import org.apache.spark.sql.internal.SQLConf
2426
import org.apache.spark.sql.test.SQLTestUtils
25-
import org.apache.spark.sql.QueryTest
2627

2728
class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
2829
test("table name with schema") {
@@ -78,7 +79,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
7879
}
7980

8081
test("lazy partition pruning reads only necessary partition data") {
81-
withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "true") {
82+
withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true") {
8283
withTable("test") {
8384
withTempDir { dir =>
8485
setupPartitionedTable("test", dir)
@@ -114,7 +115,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUt
114115
}
115116

116117
test("all partitions read and cached when filesource partition pruning is off") {
117-
withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "false") {
118+
withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") {
118119
withTable("test") {
119120
withTempDir { dir =>
120121
setupPartitionedTable("test", dir)

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
2222
import org.apache.spark.SparkException
2323
import org.apache.spark.sql.QueryTest
2424
import org.apache.spark.sql.hive.test.TestHiveSingleton
25+
import org.apache.spark.sql.internal.SQLConf
2526
import org.apache.spark.sql.test.SQLTestUtils
2627

2728
/**
@@ -62,7 +63,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
6263

6364
def testCaching(pruningEnabled: Boolean): Unit = {
6465
test(s"partitioned table is cached when partition pruning is $pruningEnabled") {
65-
withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> pruningEnabled.toString) {
66+
withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> pruningEnabled.toString) {
6667
withTable("test") {
6768
withTempDir { dir =>
6869
spark.range(5).selectExpr("id", "id as f1", "id as f2").write
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.execution
19+
20+
import org.apache.spark.sql.QueryTest
21+
import org.apache.spark.sql.catalyst.dsl.expressions._
22+
import org.apache.spark.sql.catalyst.dsl.plans._
23+
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
24+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
25+
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions, TableFileCatalog}
26+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
27+
import org.apache.spark.sql.hive.test.TestHiveSingleton
28+
import org.apache.spark.sql.test.SQLTestUtils
29+
import org.apache.spark.sql.types.StructType
30+
31+
class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
32+
33+
object Optimize extends RuleExecutor[LogicalPlan] {
34+
val batches = Batch("PruneFileSourcePartitions", Once, PruneFileSourcePartitions) :: Nil
35+
}
36+
37+
test("PruneFileSourcePartitions should not change the output of LogicalRelation") {
38+
withTable("test") {
39+
withTempDir { dir =>
40+
sql(
41+
s"""
42+
|CREATE EXTERNAL TABLE test(i int)
43+
|PARTITIONED BY (p int)
44+
|STORED AS parquet
45+
|LOCATION '${dir.getAbsolutePath}'""".stripMargin)
46+
47+
val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
48+
val tableFileCatalog = new TableFileCatalog(
49+
spark,
50+
tableMeta.database,
51+
tableMeta.identifier.table,
52+
Some(tableMeta.partitionSchema),
53+
0)
54+
55+
val dataSchema = StructType(tableMeta.schema.filterNot { f =>
56+
tableMeta.partitionColumnNames.contains(f.name)
57+
})
58+
val relation = HadoopFsRelation(
59+
location = tableFileCatalog,
60+
partitionSchema = tableMeta.partitionSchema,
61+
dataSchema = dataSchema,
62+
bucketSpec = None,
63+
fileFormat = new ParquetFileFormat(),
64+
options = Map.empty)(sparkSession = spark)
65+
66+
val logicalRelation = LogicalRelation(relation, catalogTable = Some(tableMeta))
67+
val query = Project(Seq('i, 'p), Filter('p === 1, logicalRelation)).analyze
68+
69+
val optimized = Optimize.execute(query)
70+
assert(optimized.missingInput.isEmpty)
71+
}
72+
}
73+
}
74+
}

0 commit comments

Comments
 (0)