@@ -19,13 +19,17 @@ package org.apache.spark.sql
1919
2020import org .apache .spark .sql .catalyst .expressions .{Alias , BloomFilterMightContain , Literal }
2121import org .apache .spark .sql .catalyst .expressions .aggregate .{AggregateExpression , BloomFilterAggregate }
22+ import org .apache .spark .sql .catalyst .optimizer .MergeScalarSubqueries
2223import org .apache .spark .sql .catalyst .plans .LeftSemi
2324import org .apache .spark .sql .catalyst .plans .logical .{Aggregate , Filter , Join , LogicalPlan }
25+ import org .apache .spark .sql .execution .{ReusedSubqueryExec , SubqueryExec }
26+ import org .apache .spark .sql .execution .adaptive .{AdaptiveSparkPlanHelper , AQEPropagateEmptyRelation }
2427import org .apache .spark .sql .internal .SQLConf
2528import org .apache .spark .sql .test .{SharedSparkSession , SQLTestUtils }
2629import org .apache .spark .sql .types .{IntegerType , StructType }
2730
28- class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSparkSession {
31+ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSparkSession
32+ with AdaptiveSparkPlanHelper {
2933
3034 protected override def beforeAll (): Unit = {
3135 super .beforeAll()
@@ -201,9 +205,16 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
201205 sql(" analyze table bf4 compute statistics for columns a4, b4, c4, d4, e4, f4" )
202206 sql(" analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5" )
203207 sql(" analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5" )
208+
209+ // `MergeScalarSubqueries` can duplicate subqueries in the optimized plan and would make testing
210+ // complicated.
211+ conf.setConfString(SQLConf .OPTIMIZER_EXCLUDED_RULES .key, MergeScalarSubqueries .ruleName)
204212 }
205213
206214 protected override def afterAll (): Unit = try {
215+ conf.setConfString(SQLConf .OPTIMIZER_EXCLUDED_RULES .key,
216+ SQLConf .OPTIMIZER_EXCLUDED_RULES .defaultValueString)
217+
207218 sql(" DROP TABLE IF EXISTS bf1" )
208219 sql(" DROP TABLE IF EXISTS bf2" )
209220 sql(" DROP TABLE IF EXISTS bf3" )
@@ -264,24 +275,28 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
264275 }
265276 }
266277
267- // `MergeScalarSubqueries` can duplicate subqueries in the optimized plan, but the subqueries will
268- // be reused in the physical plan.
269- def getNumBloomFilters (plan : LogicalPlan , scalarSubqueryCTEMultiplicator : Int = 1 ): Integer = {
270- val numBloomFilterAggs = plan.collectWithSubqueries {
271- case Aggregate (_, aggregateExpressions, _) =>
272- aggregateExpressions.collect {
273- case Alias (AggregateExpression (bfAgg : BloomFilterAggregate , _, _, _, _), _) =>
274- assert(bfAgg.estimatedNumItemsExpression.isInstanceOf [Literal ])
275- assert(bfAgg.numBitsExpression.isInstanceOf [Literal ])
276- 1
278+ def getNumBloomFilters (plan : LogicalPlan ): Integer = {
279+ val numBloomFilterAggs = plan.collect {
280+ case Filter (condition, _) => condition.collect {
281+ case subquery : org.apache.spark.sql.catalyst.expressions.ScalarSubquery
282+ => subquery.plan.collect {
283+ case Aggregate (_, aggregateExpressions, _) =>
284+ aggregateExpressions.map {
285+ case Alias (AggregateExpression (bfAgg : BloomFilterAggregate , _, _, _, _),
286+ _) =>
287+ assert(bfAgg.estimatedNumItemsExpression.isInstanceOf [Literal ])
288+ assert(bfAgg.numBitsExpression.isInstanceOf [Literal ])
289+ 1
290+ }.sum
277291 }.sum
292+ }.sum
278293 }.sum
279294 val numMightContains = plan.collect {
280295 case Filter (condition, _) => condition.collect {
281296 case BloomFilterMightContain (_, _) => 1
282297 }.sum
283298 }.sum
284- assert(numBloomFilterAggs == numMightContains * scalarSubqueryCTEMultiplicator )
299+ assert(numBloomFilterAggs == numMightContains)
285300 numMightContains
286301 }
287302
@@ -385,7 +400,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
385400 planEnabled = sql(query).queryExecution.optimizedPlan
386401 checkAnswer(sql(query), expectedAnswer)
387402 }
388- assert(getNumBloomFilters(planEnabled, 2 ) == getNumBloomFilters(planDisabled) + 2 )
403+ assert(getNumBloomFilters(planEnabled) == getNumBloomFilters(planDisabled) + 2 )
389404 }
390405 }
391406
@@ -413,10 +428,10 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
413428 checkAnswer(sql(query), expectedAnswer)
414429 }
415430 if (numFilterThreshold < 3 ) {
416- assert(getNumBloomFilters(planEnabled, numFilterThreshold ) ==
417- getNumBloomFilters(planDisabled) + numFilterThreshold)
431+ assert(getNumBloomFilters(planEnabled) == getNumBloomFilters(planDisabled)
432+ + numFilterThreshold)
418433 } else {
419- assert(getNumBloomFilters(planEnabled, 2 ) == getNumBloomFilters(planDisabled) + 2 )
434+ assert(getNumBloomFilters(planEnabled) == getNumBloomFilters(planDisabled) + 2 )
420435 }
421436 }
422437 }
@@ -561,4 +576,30 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
561576 """ .stripMargin)
562577 }
563578 }
579+
580+ test(" Merge runtime bloom filters" ) {
581+ withSQLConf(SQLConf .RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD .key -> " 3000" ,
582+ SQLConf .AUTO_BROADCASTJOIN_THRESHOLD .key -> " 2000" ,
583+ SQLConf .RUNTIME_FILTER_SEMI_JOIN_REDUCTION_ENABLED .key -> " false" ,
584+ SQLConf .RUNTIME_BLOOM_FILTER_ENABLED .key -> " true" ,
585+ // Re-enable `MergeScalarSubqueries`
586+ SQLConf .OPTIMIZER_EXCLUDED_RULES .key -> " " ,
587+ SQLConf .ADAPTIVE_OPTIMIZER_EXCLUDED_RULES .key -> AQEPropagateEmptyRelation .ruleName) {
588+
589+ val query = " select * from bf1 join bf2 on bf1.c1 = bf2.c2 and " +
590+ " bf1.b1 = bf2.b2 where bf2.a2 = 62"
591+ val df = sql(query)
592+ df.collect()
593+ val plan = df.queryExecution.executedPlan
594+
595+ val subqueryIds = collectWithSubqueries(plan) { case s : SubqueryExec => s.id }
596+ val reusedSubqueryIds = collectWithSubqueries(plan) {
597+ case rs : ReusedSubqueryExec => rs.child.id
598+ }
599+
600+ assert(subqueryIds.size == 1 , " Missing or unexpected SubqueryExec in the plan" )
601+ assert(reusedSubqueryIds.size == 1 ,
602+ " Missing or unexpected reused ReusedSubqueryExec in the plan" )
603+ }
604+ }
564605}
0 commit comments