Skip to content

Commit 6ce0884

Browse files
marmbruspwendell
authored andcommitted
[SQL] Improve column pruning.
Fixed a bug that was preventing us from ever pruning beneath Joins. ## TPC-DS Q3 ### Before: ``` Aggregate false, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand_id#64 AS brand_id#0,i_brand#65 AS brand#1,SUM(PartialSum#79) AS sum_agg#2] Exchange (HashPartitioning [d_year#12:0,i_brand#65:1,i_brand_id#64:2], 150) Aggregate true, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand#65,i_brand_id#64,SUM(CAST(ss_ext_sales_price#49, DoubleType)) AS PartialSum#79] Project [d_year#12:6,i_brand#65:59,i_brand_id#64:58,ss_ext_sales_price#49:43] HashJoin [ss_item_sk#36], [i_item_sk#57], BuildRight Exchange (HashPartitioning [ss_item_sk#36:30], 150) HashJoin [d_date_sk#6], [ss_sold_date_sk#34], BuildRight Exchange (HashPartitioning [d_date_sk#6:0], 150) Filter (d_moy#14:8 = 12) HiveTableScan [d_date_sk#6,d_date_id#7,d_date#8,d_month_seq#9,d_week_seq#10,d_quarter_seq#11,d_year#12,d_dow#13,d_moy#14,d_dom#15,d_qoy#16,d_fy_year#17,d_fy_quarter_seq#18,d_fy_week_seq#19,d_day_name#20,d_quarter_name#21,d_holiday#22,d_weekend#23,d_following_holiday#24,d_first_dom#25,d_last_dom#26,d_same_day_ly#27,d_same_day_lq#28,d_current_day#29,d_current_week#30,d_current_month#31,d_current_quarter#32,d_current_year#33], (MetastoreRelation default, date_dim, Some(dt)), None Exchange (HashPartitioning [ss_sold_date_sk#34:0], 150) HiveTableScan [ss_sold_date_sk#34,ss_sold_time_sk#35,ss_item_sk#36,ss_customer_sk#37,ss_cdemo_sk#38,ss_hdemo_sk#39,ss_addr_sk#40,ss_store_sk#41,ss_promo_sk#42,ss_ticket_number#43,ss_quantity#44,ss_wholesale_cost#45,ss_list_price#46,ss_sales_price#47,ss_ext_discount_amt#48,ss_ext_sales_price#49,ss_ext_wholesale_cost#50,ss_ext_list_price#51,ss_ext_tax#52,ss_coupon_amt#53,ss_net_paid#54,ss_net_paid_inc_tax#55,ss_net_profit#56], (MetastoreRelation default, store_sales, None), None Exchange (HashPartitioning [i_item_sk#57:0], 150) Filter (i_manufact_id#70:13 = 436) HiveTableScan [i_item_sk#57,i_item_id#58,i_rec_start_date#59,i_rec_end_date#60,i_item_desc#61,i_current_price#62,i_wholesale_cost#63,i_brand_id#64,i_brand#65,i_class_id#66,i_class#67,i_category_id#68,i_category#69,i_manufact_id#70,i_manufact#71,i_size#72,i_formulation#73,i_color#74,i_units#75,i_container#76,i_manager_id#77,i_product_name#78], (MetastoreRelation default, item, None), None ``` ### After ``` Aggregate false, [d_year#172,i_brand#225,i_brand_id#224], [d_year#172,i_brand_id#224 AS brand_id#160,i_brand#225 AS brand#161,SUM(PartialSum#239) AS sum_agg#162] Exchange (HashPartitioning [d_year#172:0,i_brand#225:1,i_brand_id#224:2], 150) Aggregate true, [d_year#172,i_brand#225,i_brand_id#224], [d_year#172,i_brand#225,i_brand_id#224,SUM(CAST(ss_ext_sales_price#209, DoubleType)) AS PartialSum#239] Project [d_year#172:1,i_brand#225:5,i_brand_id#224:3,ss_ext_sales_price#209:0] HashJoin [ss_item_sk#196], [i_item_sk#217], BuildRight Exchange (HashPartitioning [ss_item_sk#196:2], 150) Project [ss_ext_sales_price#209:2,d_year#172:1,ss_item_sk#196:3] HashJoin [d_date_sk#166], [ss_sold_date_sk#194], BuildRight Exchange (HashPartitioning [d_date_sk#166:0], 150) Project [d_date_sk#166:0,d_year#172:1] Filter (d_moy#174:2 = 12) HiveTableScan [d_date_sk#166,d_year#172,d_moy#174], (MetastoreRelation default, date_dim, Some(dt)), None Exchange (HashPartitioning [ss_sold_date_sk#194:2], 150) HiveTableScan [ss_ext_sales_price#209,ss_item_sk#196,ss_sold_date_sk#194], (MetastoreRelation default, store_sales, None), None Exchange (HashPartitioning [i_item_sk#217:1], 150) Project [i_brand_id#224:0,i_item_sk#217:1,i_brand#225:2] Filter (i_manufact_id#230:3 = 436) HiveTableScan [i_brand_id#224,i_item_sk#217,i_brand#225,i_manufact_id#230], (MetastoreRelation default, item, None), None ``` Author: Michael Armbrust <[email protected]> Closes apache#729 from marmbrus/fixPruning and squashes the following commits: 5feeff0 [Michael Armbrust] Improve column pruning.
1 parent 7bb9a52 commit 6ce0884

File tree

1 file changed

+11
-5
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer

1 file changed

+11
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ import org.apache.spark.sql.catalyst.types._
2525

2626
object Optimizer extends RuleExecutor[LogicalPlan] {
2727
val batches =
28-
Batch("ConstantFolding", Once,
28+
Batch("ConstantFolding", FixedPoint(100),
2929
NullPropagation,
3030
ConstantFolding,
3131
BooleanSimplification,
3232
SimplifyFilters,
3333
SimplifyCasts) ::
34-
Batch("Filter Pushdown", Once,
34+
Batch("Filter Pushdown", FixedPoint(100),
3535
CombineFilters,
3636
PushPredicateThroughProject,
3737
PushPredicateThroughInnerJoin,
@@ -49,24 +49,27 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
4949
*/
5050
object ColumnPruning extends Rule[LogicalPlan] {
5151
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
52+
// Eliminate attributes that are not needed to calculate the specified aggregates.
5253
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
53-
// Project away references that are not needed to calculate the required aggregates.
5454
a.copy(child = Project(a.references.toSeq, child))
5555

56+
// Eliminate unneeded attributes from either side of a Join.
5657
case Project(projectList, Join(left, right, joinType, condition)) =>
5758
// Collect the list of off references required either above or to evaluate the condition.
5859
val allReferences: Set[Attribute] =
5960
projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)
60-
/** Applies a projection when the child is producing unnecessary attributes */
61+
62+
/** Applies a projection only when the child is producing unnecessary attributes */
6163
def prunedChild(c: LogicalPlan) =
62-
if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) {
64+
if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
6365
Project(allReferences.filter(c.outputSet.contains).toSeq, c)
6466
} else {
6567
c
6668
}
6769

6870
Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))
6971

72+
// Combine adjacent Projects.
7073
case Project(projectList1, Project(projectList2, child)) =>
7174
// Create a map of Aliases to their values from the child projection.
7275
// e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
@@ -83,6 +86,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
8386
}).asInstanceOf[Seq[NamedExpression]]
8487

8588
Project(substitutedProjection, child)
89+
90+
// Eliminate no-op Projects
91+
case Project(projectList, child) if(child.output == projectList) => child
8692
}
8793
}
8894

0 commit comments

Comments
 (0)