Skip to content

Commit bad9f33

Browse files
committed
Fix tests.
1 parent b18ab65 commit bad9f33

File tree

2 files changed

+25
-22
lines changed

2 files changed

+25
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ trait CodegenSupport extends SparkPlan {
3939
private def variablePrefix: String = this match {
4040
case _: TungstenAggregate => "agg"
4141
case _: BroadcastHashJoin => "bhj"
42+
case _: PhysicalRDD => "rddScan"
4243
case _ => nodeName.toLowerCase
4344
}
4445

sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -73,32 +73,34 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
7373
bucketValues: Seq[Integer],
7474
filterCondition: Column,
7575
originalDataFrame: DataFrame): Unit = {
76+
// This test verifies parts of the plan. Disable whole stage codegen.
77+
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
78+
val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k")
79+
val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
80+
// Limit: bucket pruning only works when the bucket column has one and only one column
81+
assert(bucketColumnNames.length == 1)
82+
val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head)
83+
val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex)
84+
val matchedBuckets = new BitSet(numBuckets)
85+
bucketValues.foreach { value =>
86+
matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value))
87+
}
7688

77-
val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k")
78-
val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec
79-
// Limit: bucket pruning only works when the bucket column has one and only one column
80-
assert(bucketColumnNames.length == 1)
81-
val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head)
82-
val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex)
83-
val matchedBuckets = new BitSet(numBuckets)
84-
bucketValues.foreach { value =>
85-
matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value))
86-
}
89+
// Filter could hide the bug in bucket pruning. Thus, skipping all the filters
90+
val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
91+
val rdd = plan.find(_.isInstanceOf[PhysicalRDD])
92+
assert(rdd.isDefined, plan)
8793

88-
// Filter could hide the bug in bucket pruning. Thus, skipping all the filters
89-
val rdd = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
90-
.find(_.isInstanceOf[PhysicalRDD])
91-
assert(rdd.isDefined)
94+
val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
95+
if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty)
96+
}
97+
// checking if all the pruned buckets are empty
98+
assert(checkedResult.collect().forall(_ == true))
9299

93-
val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) =>
94-
if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty)
100+
checkAnswer(
101+
bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"),
102+
originalDataFrame.filter(filterCondition).orderBy("i", "j", "k"))
95103
}
96-
// checking if all the pruned buckets are empty
97-
assert(checkedResult.collect().forall(_ == true))
98-
99-
checkAnswer(
100-
bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"),
101-
originalDataFrame.filter(filterCondition).orderBy("i", "j", "k"))
102104
}
103105

104106
test("read partitioning bucketed tables with bucket pruning filters") {

0 commit comments

Comments
 (0)