Skip to content

Conversation

@lianhuiwang
Copy link
Contributor

@lianhuiwang lianhuiwang commented Jul 9, 2016

What changes were proposed in this pull request?

In TPCDS-Q14 the same physical plan of uncorrelated scalar subqueries from a CTE could be executed multiple times, we should re-use the same result to avoid the duplicated computing.
Before

scala> (1 to 3).map(i => (i, i)).toDF("key", "value").createOrReplaceTempView("t1")
scala> sql("WITH max_test AS( SELECT max(key) as max_key FROM t ) SELECT key FROM t1 WHERE key = (SELECT max_key FROM max_test) and value = (SELECT max_key FROM max_test)").explain
== Physical Plan ==
*Project [_1#200 AS key#203]
+- *Filter ((_1#200 = subquery#209) && (_2#201 = subquery#210))
   :  :- Subquery subquery#209
   :  :  +- *HashAggregate(keys=[], functions=[max(key#203)], output=[max_key#211])
   :  :     +- Exchange SinglePartition
   :  :        +- *HashAggregate(keys=[], functions=[partial_max(key#203)], output=[max#217])
   :  :           +- LocalTableScan [key#203]
   :  +- Subquery subquery#210
   :     +- *HashAggregate(keys=[], functions=[max(key#203)], output=[max_key#211])
   :        +- Exchange SinglePartition
   :           +- *HashAggregate(keys=[], functions=[partial_max(key#203)], output=[max#219])
   :              +- LocalTableScan [key#203]
   +- LocalTableScan [_1#200, _2#201]

After

scala> (1 to 3).map(i => (i, i)).toDF("key", "value").createOrReplaceTempView("t1")
scala> sql("WITH max_test AS( SELECT max(key) as max_key FROM t ) SELECT key FROM t1 WHERE key = (SELECT max_key FROM max_test) and value = (SELECT max_key FROM max_test)").explain
== Physical Plan ==
*Project [_1#200 AS key#203]
+- *Filter ((_1#200 = subquery#209) && (_2#201 = ReusedSubquery#210(subquery#209)))
   :  +- Subquery subquery#209
   :     +- *HashAggregate(keys=[], functions=[max(key#203)], output=[max_key#211])
   :        +- Exchange SinglePartition
   :           +- *HashAggregate(keys=[], functions=[partial_max(key#203)], output=[max#217])
   :              +- LocalTableScan [key#203]
   +- LocalTableScan [_1#200, _2#201]

How was this patch tested?

Pass the Jenkins tests (including a new testcase).

@SparkQA
Copy link

SparkQA commented Jul 9, 2016

Test build #62007 has finished for PR 14111 at commit 5290e42.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ReusedScalarSubquery(

@SparkQA
Copy link

SparkQA commented Jul 9, 2016

Test build #62010 has finished for PR 14111 at commit b1914de.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 9, 2016

Test build #62017 has finished for PR 14111 at commit 77ea002.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 9, 2016

Test build #62018 has finished for PR 14111 at commit 0311542.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 9, 2016

Test build #62021 has finished for PR 14111 at commit c4bb273.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 10, 2016

Test build #62056 has finished for PR 14111 at commit 1d7bd3c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lianhuiwang
Copy link
Contributor Author

cc @rxin @hvanhovell @cloud-fan

def subqueries: Seq[PlanType] = {
expressions.flatMap(_.collect {case e: SubqueryExpression => e.plan.asInstanceOf[PlanType]})
expressions.flatMap(_.collect {
case e: SubqueryExpression => e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use ExpressionCanonicalizer to canonicalize the expression before call distinct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we use ReusedScalarSubquery, not Alias to indicate the reused SubqueryExpression, I think we don't use ExpressionCanonicalizer.

@cloud-fan
Copy link
Contributor

I have a simpler idea to implement this feature:

  1. in SparkPlan, build a map from distinct subqueries to future result, and still keep the original subquery list.
  2. in SparkPlan.prepareSubqueries, iterate the map and execute these distinct subqueries.
  3. in SparkPlan.waitForSubqueries, iterate the original subquery list, get the result by searching the map, and then call updateResult.

I think this approach need much less code change, what do you think?

@lianhuiwang
Copy link
Contributor Author

lianhuiwang commented Jul 13, 2016

@cloud-fan At firstly I have implemented it with you said. But the following situation that has broadcast join will have a error 'ScalarSubquery has not finished', example (from SPARK-14791):
val df = (1 to 3).map(i => (i, i)).toDF("key", "value")
df.createOrReplaceTempView("t1")
df.createOrReplaceTempView("t2")
df.createOrReplaceTempView("t3")
val q = sql("select * from t1 join (select key, value from t2 " +
" where key > (select avg (key) from t3))t on (t1.key = t.key)")
Before:

*BroadcastHashJoin [key#5], [key#26], Inner, BuildRight
:- *Project [_1#2 AS key#5, _2#3 AS value#6]
:  +- *Filter (cast(_1#2 as double) > subquery#13)
:     :  +- Subquery subquery#13
:     :     +- *HashAggregate(keys=[], functions=[avg(cast(key#5 as bigint))], output=[avg(key)#25])
:     :        +- Exchange SinglePartition
:     :           +- *HashAggregate(keys=[], functions=[partial_avg(cast(key#5 as bigint))], output=[sum#30, count#31L])
:     :              +- LocalTableScan [key#5]
:     +- LocalTableScan [_1#2, _2#3]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- *Project [_1#2 AS key#26, _2#3 AS value#27]
      +- *Filter (cast(_1#2 as double) > subquery#13)
         :  +- Subquery subquery#13
         :     +- *HashAggregate(keys=[], functions=[avg(cast(key#5 as bigint))], output=[avg(key)#25])
         :        +- Exchange SinglePartition
         :           +- *HashAggregate(keys=[], functions=[partial_avg(cast(key#5 as bigint))], output=[sum#30, count#31L])
         :              +- LocalTableScan [key#5]
         +- LocalTableScan [_1#2, _2#3]

The steps are as follows:

  1. BroadcastHashJoin.prepare()
  2. t1.Filter.prepareSubqueries, it will prepare subquery.
  3. BroadcastExchange.prepare()
  4. t2.Filter.prepareSubqueries, it will prepare subquery.
  5. BroadcastExchange.doPrepare(), it is in prepare() and will call child.executeCollect().
  6. t2.Filter.execute()
  7. t2.Filter.waitForSubqueries(), it will wait for subquery.
  8. BroadcastHashJoin.doExecute()
  9. BroadcastExchange.executeBroadcast()
  10. t1.Filter.execute()
  11. t1.Filter.waitForSubqueries(), it will wait for subquery.
    Before this PR there are two different subqueries, they cannot wait for other's results.

But after this PR, they are the same subquery, the steps are as follows:

  1. t1.Filter.prepareSubqueries, it will prepare subquery.
  2. t2.Filter.prepareSubqueries, it will do not submit subquery's execute().
  3. t2.Filter.waitForSubqueries(), it will can wait for subquery that step-1 have submitted before.
  4. t1.Filter.waitForSubqueries(), it do not await subquery's results because step-3 have updated.
    So I make some logical codes to ScalarSubquery in order to deal with it.

@cloud-fan
Copy link
Contributor

@lianhuiwang, after taking a look at this example, I think this is a very special case: 2 physical plans reference to one same subquery(same instance). However, I don't think this is a valid case, I'd rather treat it as a bug of constraints propagation. Except this case, do we have another case that the simpler approach can't handle?

@lianhuiwang
Copy link
Contributor Author

lianhuiwang commented Jul 25, 2016

@cloud-fan I don't think it is a bug of constraints propagation because filter with the uncorrelated scalar subquery needs to push down due to it can filter many records.
In addition, the following query(more like TPCDS-Q14):

with  avg_table as
(select avg (key) as avg_key from t3)
select 1 as col
from t3 where key > (select avg_key from avg_table)
union all
select 1 as col from t1 join (select key, value from t2
where key > (select avg_key from avg_table))t on (t1.key = t.key)

When BroadcastExchangeExec has one same subquery that also appears in other places of this query, The first place will at firstly prepare subquery, But the second place in BroadcastExchangeExec will firstly wait for subquery because BroadcastExchangeExec.doPrepare will execute child plan.
So BroadcastExchangeExec's child plan needs to wait for the subquery Results that the first place has been submitted.

@cloud-fan
Copy link
Contributor

ah you are right, I think we need a better approach to execute subqueries and reuse the results globally. cc @hvanhovell to comment more on this.

@viirya
Copy link
Member

viirya commented Aug 12, 2016

I think this is duplicate to the merged one #14548?

@hvanhovell
Copy link
Contributor

@lianhuiwang we have merged PR #14548 implementing similar functionality. Could you close this PR? Thanks for your work!

@lianhuiwang
Copy link
Contributor Author

OK. Thanks.

@lianhuiwang lianhuiwang closed this Sep 1, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants