Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Aug 2, 2016

What changes were proposed in this pull request?

Design doc: https://issues.apache.org/jira/secure/attachment/12845016/de-duplicating%20subqueries.pdf

The subqueries in SparkSQL will be run even they have the same physical plan and output same results. We should be able to deduplicate these subqueries which are referred in a query for many times.

This change is motivated by the attempt to improve the performance of CTE subqueries. However, it is not limited to CTE but can be applied to other subqueries in general. Note that because CTE provides the scope information, we can better utilize it to find common subqueries among the whole query plan.

For the other subqueries, we rely on sameResult to decide if two subqueries are the same common subquery. As we know, sameResult doesn't guarantee to determine two given plans will produce the same results, so it is possibly two non-CTE subqueries which produce the same results are not considered as common subquery.

Compared to existing result reusing rule ReuseExchange

The planning rule ReuseExchange can find out duplicated exchanges in the spark plan and reuse the same exchange for all the references. So we already have it to reuse Broadcast and Shuffle. Why do we need this PR to reuse duplicated subqueries?

The mean we rely on to find the duplicated exchanges is sameResult. Quoted from its comments:

Since its likely undecidable to generally determine if two given plans will produce the same results, it is okay for this function to return false, even if the results are actually the same.

That's said, for some cases (if not most cases), the query plan can't meet the requirement of sameResult to reuse Shuffle or Broadcast, even the two plans are actually producing the same results. That is what we observed when benchmarking on TPC-DS queries.

However, CTE subqueries provide the good scope information (i.e., the subquery alias) we can utilize to find duplicated query plans. E.g., for a query like:

WITH cte AS (SELECT * FROM src) SELECT * FROM cte a JOIN cte b

It is guaranteed that a and b are the same, and we can reuse the same result.

Besides, there are operators and situations which don't involve Exchange and we still need to de-duplicate the same subqueries.

Eliminating SubqueryAlias

Previously, we eliminate all SubqueryAlias at the beginning of Optimizer. Now it is changed to only eliminate the SubqueryAlias which is only referred once in the whole query plan. This is performed by the new EliminateOneTimeSubqueryAliases.

De-duplicate common SubqueryAlias

Another new rule DedupCommonSubqueries is introduced to de-duplicate common subqueries before optimization.

Each SubqueryAlias has a child logical plan representing the subquery. We generate the executed plan for the logical plan and attach it to a CommonSubquery operator which is just a wrapper for the executed plan.

Note that we will search if any same logical plan exists. So for any distinct subquery logical plan, we will have only one executed plan shared between many CommonSubquery. Once the executed plan is executed, its result will be kept and shared between these CommonSubquery too.

Note that we will only keep the common subqueries at the highest level in the query. That is said we will strip the common subqueries wrapped in other common subquery. That will simplify the logic to optimize filter/projection pushdown between such nested common subqueries. And the common subquery at the highest level should be the most expensive one to de-duplicate.

Modifying sameResult of SubqueryAlias

This PR override the default sameResult method in SubqueryAlias. If two SubqueryAlias have the child logical plans generating the same results, the sameResult on them should return true.

Optimization regarding common subqueries and operators on them

It is possibly that we have a logical plan including a common subquery like this:

Join
  Filter [year = 2000]
    SubqueryAlias a
      ...
  Filter [year = 2001]
    SubqueryAlias b
      ...

Optimization rules that push down Filter would work for it because the additional SubqueryAlias. We know Filter and Project pushdown can improve the query performance a lot. So we introduce a new optimization rule OptimizeCommonSubqueries.

Simply said, the rule finds the common subqueries. For each common subquery, we optimize all subqueries producing the same results with the operator on them. We consider two cases. First, all subqueries have a Project on them. Second, all subqueries have a Filter on them.

For Project pushdown, we concatenate all project lists and push down it to all subqueries. For Filter pushdown, we concatenate all filtering conditions with a Or expression and push down it to all subqueries. For example, the above query plan would look like:

Join
  Filter [year = 2000]
    SubqueryAlias a
      Filter [year = 2000 or year = 2001]
      ...
  Filter [year = 2001]
    SubqueryAlias b
      Filter [year = 2000 or year = 2001]
      ...

Note that the two subqueries have the same filter operator pushed down. So they are still common subqueries and the results can be reused.

How was this patch tested?

Jenkins tests.

@viirya viirya force-pushed the single-exec-subquery branch from 00b29ed to 55a44c8 Compare August 2, 2016 05:48
@SparkQA
Copy link

SparkQA commented Aug 2, 2016

Test build #63107 has finished for PR 14452 at commit 00b29ed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class DedupCommonSubqueries(sparkSession: SparkSession) extends Rule[LogicalPlan]

@SparkQA
Copy link

SparkQA commented Aug 2, 2016

Test build #63108 has finished for PR 14452 at commit 55a44c8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class DedupCommonSubqueries(sparkSession: SparkSession) extends Rule[LogicalPlan]

@viirya viirya force-pushed the single-exec-subquery branch from 55a44c8 to ebaa1c6 Compare August 2, 2016 07:54
@SparkQA
Copy link

SparkQA commented Aug 2, 2016

Test build #63114 has finished for PR 14452 at commit ebaa1c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class DedupCommonSubqueries(sparkSession: SparkSession) extends Rule[LogicalPlan]

@viirya
Copy link
Member Author

viirya commented Aug 2, 2016

cc @cloud-fan @hvanhovell Can you take a look? Thanks!

@viirya viirya force-pushed the single-exec-subquery branch from ebaa1c6 to 6bfdb32 Compare August 2, 2016 12:54
@SparkQA
Copy link

SparkQA commented Aug 2, 2016

Test build #63120 has finished for PR 14452 at commit 6bfdb32.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class DedupCommonSubqueries(sparkSession: SparkSession) extends Rule[LogicalPlan]

@SparkQA
Copy link

SparkQA commented Aug 4, 2016

Test build #63202 has finished for PR 14452 at commit 7fe57a0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya force-pushed the single-exec-subquery branch from 7fe57a0 to 6bfdb32 Compare August 4, 2016 06:24
@SparkQA
Copy link

SparkQA commented Aug 4, 2016

Test build #63216 has finished for PR 14452 at commit 6bfdb32.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class DedupCommonSubqueries(sparkSession: SparkSession) extends Rule[LogicalPlan]

@viirya viirya changed the title [SPARK-16849][SQL] Improve subquery execution by deduplicating the subqueries with the same results [SPARK-16849][SQL] Improve subquery execution in CTE by deduplicating the subqueries with the same results Aug 5, 2016
@SparkQA
Copy link

SparkQA commented Aug 5, 2016

Test build #63270 has finished for PR 14452 at commit 229ae31.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Aug 6, 2016

ping @cloud-fan @hvanhovell Can you look at this if it is making sense for you? Thanks.

@viirya
Copy link
Member Author

viirya commented Aug 9, 2016

ping @cloud-fan @hvanhovell @liancheng Could you review this change? Thanks.

@gatorsmile
Copy link
Member

Will the deduplication logics on conflicting attributes in Analyzer affect your solution?

/**
* Generate a new logical plan for the right child with different expression IDs
* for all conflicting attributes.
*/
private def dedupRight (left: LogicalPlan, right: LogicalPlan): LogicalPlan = {

Conflicts:
	sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@viirya
Copy link
Member Author

viirya commented Aug 10, 2016

@gatorsmile I think it doesn't affect this. This change is happened after analysis.

@SparkQA
Copy link

SparkQA commented Aug 10, 2016

Test build #63491 has finished for PR 14452 at commit 734b050.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class ShuffleIndexInformation
    • public class ShuffleIndexRecord
    • class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable
    • case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterministic
    • case class SparkPartitionID() extends LeafExpression with Nondeterministic
    • case class AggregateExpression(
    • case class Least(children: Seq[Expression]) extends Expression
    • case class Greatest(children: Seq[Expression]) extends Expression
    • case class CurrentDatabase() extends LeafExpression with Unevaluable
    • class GenericInternalRow(val values: Array[Any]) extends BaseGenericInternalRow
    • class AbstractScalaRowIterator[T] extends Iterator[T]
    • class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double]
    • class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long]
    • class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long]
    • class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double]
    • case class CreateTable(tableDesc: CatalogTable, mode: SaveMode, query: Option[LogicalPlan])
    • case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan]
    • implicit class SchemaAttribute(f: StructField)

@gatorsmile
Copy link
Member

Will it affect sameResult?

@viirya
Copy link
Member Author

viirya commented Aug 10, 2016

I think the deduplicating the conflicting attributes in Analyzer is generating new expression ids. sameResult is tolerant of expression id differences.

@gatorsmile
Copy link
Member

If sameResult does not consider the expression id, will it return true if two tables have the same name columns? If my understanding is not wrong, it only tolerates Alias with the different expression IDs.

@SparkQA
Copy link

SparkQA commented Aug 10, 2016

Test build #63494 has finished for PR 14452 at commit 70af8d6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Aug 10, 2016

Yea, but other attribute references will be bound before comparing in sameResult. So the expression id differences should be no effect. What do you think?

@SparkQA
Copy link

SparkQA commented Aug 10, 2016

Test build #63496 has finished for PR 14452 at commit ba11d34.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

It is just bound to its own children, right? If you have two CTE, their children might have different expression IDs after the deduplication logics on conflicting attributes in Analyzer.

@rxin
Copy link
Contributor

rxin commented Oct 11, 2016

Adding an explicit cache call btw -- can actually slow things down due to bad memory management.

@viirya
Copy link
Member Author

viirya commented Oct 11, 2016

@rxin yeah, as I tried adding explicit cache call doesn't improve it. So I remove it then.

According to my test in local cluster, without the cache call, I can't see actual speed-up. Only with cache call, I can see improvement.

But it is true that the effective caching for SQL needs additional copy call which adds costs and reduces the improvement.

@SparkQA
Copy link

SparkQA commented Dec 23, 2016

Test build #70541 has started for PR 14452 at commit 9faf90a.

@viirya
Copy link
Member Author

viirya commented Dec 23, 2016

Revisit this by rebasing with master.

BTW, in 500+ LOC changes, actually there are 200+ LOC changes are test cases.

@viirya
Copy link
Member Author

viirya commented Dec 23, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Dec 23, 2016

Test build #70542 has finished for PR 14452 at commit 9faf90a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

After reading the PR description, I think this improvement is quite complex and worth a design doc. We should explain how the subquery execution works now and how you are going to change it.

@viirya
Copy link
Member Author

viirya commented Dec 24, 2016

@cloud-fan Thanks for comment. I agreed that. I will prepare a design doc soon.

@davies
Copy link
Contributor

davies commented Dec 24, 2016

@viirya For duplicated CTE, without some optimization (pushing down different predicates in different positions), the physical plan should be identical. So I'm wondering some aggressive pushing down cause the problem for some queries (IsNotNull(xxx)). This is the reason I asked that.

@viirya
Copy link
Member Author

viirya commented Dec 26, 2016

@davies It is true that pushing down different predicates results in different CTE logical/physical plans. I spend some LOC changes in this to address that cases, i.e., preparing a disjunctive predicate for duplicated CTE with different predicates.

For Q64, a disjunctive predicate will be pushed down too. I am not sure what the problem is you mentioned.

Let me try to get and show the pushed down predicate.

@viirya
Copy link
Member Author

viirya commented Dec 26, 2016

I would like to share some numbers I ran this on my local cluster today (5 nodes, on yarn, 8GB each node).

After this (we cached the output of duplicated CTEs):

q2: 10037 ms (-7%) (no disjunctive predicate pushed down, 2 duplicated and simple CTEs)
q11: 33713 ms (-13%) (complicated disjunctive predicate pushed down to 4 duplicated CTEs)
q39a: 13556 ms (-5%) (simple disjunctive predicate pushed down, 2 duplicated CTEs)
q39b: 11957 ms (-3%) (similar to q39a)
q47: 34854 ms (+25%) (no disjunctive predicate pushed down, 3 duplicated and complicated CTEs)
q57: 28388 ms (+18%) (similar to q47)
q59: 11662 ms (-12%) (no disjunctive predicate pushed down, 2 duplicated and simple CTEs)
q74: 29997 ms (-1%) (complicated disjunctive predicate pushed down to 4 duplicated CTEs)
q75: 22542 ms (+19%) (simple disjunctive predicate pushed down, 2 duplicated and complicated CTEs)

** Because caching will introduce extra cost, if the duplicated CTEs are not complicated enough, it might be regression.
** If the duplicated CTEs are used with different filter conditions, we will get a disjunctive predicate to be pushed down to the CTEs. If the disjunctive predicate is complicated, it might be regression.

Before this:

q2: 9414 ms
q11: 29764 ms
q39a: 12961 ms
q39b: 11578 ms
q47: 46424 ms
q57: 34798 ms
q59: 10420 ms
q74: 29574 ms
q75: 27767 ms

Q64 causes out-of-memory on the cluster. These queries are CTE queries.

The data is generated with spark-sql-perf using scaleFactor = 1 to make these queries working on my cluster.

@SparkQA
Copy link

SparkQA commented Dec 26, 2016

Test build #70587 has finished for PR 14452 at commit 14e1519.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

can you provide a typical query that can benefit from this PR? And also show why it's slow without your PR. Thanks!

@viirya viirya force-pushed the single-exec-subquery branch from 14e1519 to e2bd146 Compare December 26, 2016 13:51
@viirya
Copy link
Member Author

viirya commented Dec 26, 2016

@cloud-fan ok. I will do it.

@viirya
Copy link
Member Author

viirya commented Dec 26, 2016

@rxin has pointed out:

Adding an explicit cache call btw -- can actually slow things down due to bad memory management.

In my experiment, I found:

According to my test in local cluster, without the cache call, I can't see actual speed-up. Only with cache call, I can see improvement.

However the effective caching for rows needs additional copy call which adds costs and reduces the improvement.

@SparkQA
Copy link

SparkQA commented Dec 26, 2016

Test build #70604 has finished for PR 14452 at commit e2bd146.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Dec 28, 2016

Q47 is a typical query that can benefit from this PR:

with v1 as(
select i_category, i_brand,
       s_store_name, s_company_name,
       d_year, d_moy,
       sum(ss_sales_price) sum_sales,
       avg(sum(ss_sales_price)) over
         (partition by i_category, i_brand,
                    s_store_name, s_company_name, d_year)
         avg_monthly_sales,
       rank() over
         (partition by i_category, i_brand,
                    s_store_name, s_company_name
          order by d_year, d_moy) rn
from item, store_sales, date_dim, store
where ss_item_sk = i_item_sk and
      ss_sold_date_sk = d_date_sk and
      ss_store_sk = s_store_sk and
      (
        d_year = 1999 or
        ( d_year = 1999-1 and d_moy =12) or
        ( d_year = 1999+1 and d_moy =1)
group by i_category, i_brand,
         s_store_name, s_company_name,
         d_year, d_moy),
v2 as(
select v1.i_category, v1.i_brand, v1.s_store_name, v1.s_company_name, v1.d_year,
       v1.d_moy, v1.avg_monthly_sales ,v1.sum_sales, v1_lag.sum_sales psum,
       v1_lead.sum_sales nsum
from v1, v1 v1_lag, v1 v1_lead
where v1.i_category = v1_lag.i_category and
      v1.i_category = v1_lead.i_category and
      v1.i_brand = v1_lag.i_brand and
      v1.i_brand = v1_lead.i_brand and
      v1.s_store_name = v1_lag.s_store_name and
      v1.s_store_name = v1_lead.s_store_name and
      v1.s_company_name = v1_lag.s_company_name and
      v1.s_company_name = v1_lead.s_company_name and
      v1.rn = v1_lag.rn + 1 and
      v1.rn = v1_lead.rn - 1)
select * from v2
where  d_year = 1999 and
       avg_monthly_sales > 0 and
       case when avg_monthly_sales > 0 then abs(sum_sales - avg_monthly_sales) / avg_monthly_sales else null end > 0.1
order by sum_sales - avg_monthly_sales, 3
limit 100

It has 2 CTEs, v1 and v2. v2 joins three duplicated v1 plans. There are no disjunctive predicates needed to be pushed down to those v1 plans. So the three duplicated v1 plans have the same output data in the end. The physical plan of v1 looks complicated.

Obviously without this PR we will run three physical plans of v1. This PR runs the physical plan of v1 for only one time and cache its output data.

@viirya viirya force-pushed the single-exec-subquery branch from e2bd146 to f153c12 Compare December 28, 2016 06:24
@SparkQA
Copy link

SparkQA commented Dec 28, 2016

Test build #70655 has started for PR 14452 at commit f153c12.

subqueries.tail.foreach {
case Filter(otherCond, child) =>
val rewrites = buildRewrites(child, subqueries(0).asInstanceOf[Filter].child)
// We can't simply push down all conditions from other Filter by concatenating them with
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part has been extracted out as #15558 and can be removed if that PR is merged.

@SparkQA
Copy link

SparkQA commented Dec 28, 2016

Test build #70659 has started for PR 14452 at commit aeba1c3.

@viirya
Copy link
Member Author

viirya commented Dec 28, 2016

retest this please.

@SparkQA
Copy link

SparkQA commented Dec 28, 2016

Test build #70661 has finished for PR 14452 at commit aeba1c3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Dec 29, 2016

Because the actual improvement of this de-duplication depends on the complexity of disjunctive predicate pushdown and CTE subquery, if we can't have general rule to decide whether de-duplicate or not, I think can we have a config (default off) to enable/disable this subquery de-duplication? So users can decide if they want to cache and de-duplicate subqueries in their query.

@viirya viirya closed this Mar 7, 2017
@viirya viirya deleted the single-exec-subquery branch December 27, 2023 18:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants