@@ -251,7 +251,7 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
251251 // - the merged plan,
252252 // - the attribute mapping from the new to the merged version,
253253 // - optional filters of both plans that need to be propagated and merged in an ancestor
254- // `Aggregate` node if possible.
254+ // `Aggregate` node if possible.
255255 //
256256 // Please note that merging arbitrary plans can be complicated, the current version supports only
257257 // some of the most important nodes.
@@ -348,24 +348,130 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
348348 case _ => None
349349 }
350350
351- // If `Filter`s are not exactly the same we can still try propagating up their differing
352- // condition because in some cases we will be able to merge them in an `Aggregate` parent
353- // node.
354- // E.g.:
355- // SELECT avg(a) FROM t WHERE c = 1
351+ // If `Filter` conditions are not exactly the same we can still try propagating up their
352+ // differing condition because in some cases we will be able to merge them in an `Aggregate`
353+ // parent node. E.g. we can merge:
354+ //
355+ // SELECT avg(a) FROM t WHERE c = 1
356+ //
356357 // and:
357- // SELECT sum(b) FROM t WHERE c = 2
358- // can be merged to:
359- // SELECT namedStruct(
360- // 'a', avg(a) FILTER (WHERE c = 1),
361- // 'b', sum(b) FILTER (WHERE c = 2)) AS mergedValue
358+ //
359+ // SELECT sum(b) FROM t WHERE c = 2
360+ //
361+ // into:
362+ //
363+ // SELECT
364+ // avg(a) FILTER (WHERE c = 1),
365+ // sum(b) FILTER (WHERE c = 2)
362366 // FORM t
363367 // WHERE c = 1 OR c = 2
364368 //
365- // Please note that depending on where the different `Filter`s reside in the plan and on
366- // which column the predicates are defined, we need to check the physical plan to make sure
367- // if `c` is not a partitioning or bucketing column and `c` is not present in pushed down
368- // filters. Otherwise the merged query can suffer performance degradation.
369+ // But there are some sp2cial cases we need to consider:
370+ //
371+ // - The plans to be merged might contain multiple adjacent `Filter` nodes and the parent
372+ // `Filter` nodes should incorporate the propagated filters from child ones during merge.
373+ //
374+ // E.g. adjacent filters can appear in plans when some of the optimization rules (like
375+ // `PushDownPredicates`) are disabled.
376+ //
377+ // Let's consider we want to merge query 1:
378+ //
379+ // SELECT avg(a)
380+ // FROM (
381+ // SELECT * FROM t WHERE c1 = 1
382+ // ) t
383+ // WHERE c2 = 1
384+ //
385+ // and query 2:
386+ //
387+ // SELECT sum(b)
388+ // FROM (
389+ // SELECT * FROM t WHERE c1 = 2
390+ // ) t
391+ // WHERE c2 = 2
392+ //
393+ // then the optimal merged query is:
394+ //
395+ // SELECT
396+ // avg(a) FILTER (WHERE c2 = 1 AND c1 = 1),
397+ // sum(b) FILTER (WHERE c2 = 2 AND c1 = 2)
398+ // FORM (
399+ // SELECT * FROM t WHERE c1 = 1 OR c1 = 2
400+ // ) t
401+ // WHERE (c2 = 1 AND c1 = 1) OR (c2 = 2 AND c1 = 2)
402+ //
403+ // This is because the `WHERE (c2 = 1 AND c1 = 1) OR (c2 = 2 AND c1 = 2)` parent `Filter`
404+ // condition is more selective than a simple `WHERE c2 = 1 OR c2 = 2` would be as the
405+ // simple condition would let trough rows containing c1 = 1 and c2 = 2, which none of the
406+ // original queries do.
407+ //
408+ // - When we are merging plans to already merged plans the propagated filter conditions
409+ // could grow quickly, which we can avoid with tagging the already propagated filters.
410+ //
411+ // E.g. if we merged the previous optimal merged query and query 3:
412+ //
413+ // SELECT max(b)
414+ // FROM (
415+ // SELECT * FROM t WHERE c1 = 3
416+ // ) t
417+ // WHERE c2 = 3
418+ //
419+ // then a new double-merged query would look like this:
420+ //
421+ // SELECT
422+ // avg(a) FILTER (WHERE
423+ // (c2 = 1 AND c1 = 1) AND
424+ // ((c2 = 1 AND c1 = 1) OR (c2 = 2 AND c1 = 2) AND (c1 = 1 OR c1 = 2))
425+ // ),
426+ // sum(b) FILTER (WHERE
427+ // (c2 = 2 AND c1 = 2) AND
428+ // ((c2 = 1 AND c1 = 1) OR (c2 = 2 AND c1 = 2) AND (c1 = 1 OR c1 = 2))
429+ // ),
430+ // max(b) FILTER (WHERE c2 = 3 AND c1 = 3)
431+ // FORM (
432+ // SELECT * FROM t WHERE (c1 = 1 OR c1 = 2) OR c1 = 3
433+ // ) t
434+ // WHERE
435+ // ((c2 = 1 AND c1 = 1) OR (c2 = 2 AND c1 = 2) AND (c1 = 1 OR c1 = 2)) OR
436+ // (c2 = 3 AND c1 = 3)
437+ //
438+ // which is not optimal and contains unnecessary complex conditions.
439+ //
440+ // Please note that `BooleanSimplification` and other rules could help simplifying filter
441+ // conditions, but when we merge large number if queries in this rule, the plan size can
442+ // increase exponentially and can cause memory issues before `BooleanSimplification` could
443+ // run.
444+ //
445+ // But we can avoid that complexity if we tag already propagated filter conditions with a
446+ // simple `PropagatedFilter` wrapper during merge.
447+ // E.g. the actual merged query of query 1 and query 2 produced by this rule looks like
448+ // this:
449+ //
450+ // SELECT
451+ // avg(a) FILTER (WHERE c2 = 1 AND c1 = 1),
452+ // sum(b) FILTER (WHERE c2 = 2 AND c1 = 2)
453+ // FORM (
454+ // SELECT * FROM t WHERE PropagatedFilter(c1 = 1 OR c1 = 2)
455+ // ) t
456+ // WHERE PropagatedFilter((c2 = 1 AND c1 = 1) OR (c2 = 2 AND c1 = 2))
457+ //
458+ // And so when we merge query 3 we know that filter conditions tagged with
459+ // `PropagatedFilter` can be ignored during filter propagation and thus the we get a much
460+ // simpler double-merged query:
461+ //
462+ // SELECT
463+ // avg(a) FILTER (WHERE c2 = 1 AND c1 = 1),
464+ // sum(b) FILTER (WHERE c2 = 2 AND c1 = 2),
465+ // max(b) FILTER (WHERE c2 = 3 AND c1 = 3)
466+ // FORM (
467+ // SELECT * FROM t WHERE PropagatedFilter(PropagatedFilter(c1 = 1 OR c1 = 2) OR c1 = 3)
468+ // ) t
469+ // WHERE
470+ // PropagatedFilter(
471+ // PropagatedFilter((c2 = 1 AND c1 = 1) OR (c2 = 2 AND c1 = 2) OR
472+ // (c2 = 3 AND c1 = 3))
473+ //
474+ // At the end of the rule we remove the `PropagatedFilter` wrappers.
369475 case (_, np : Filter , cp : Filter ) =>
370476 tryMergePlans(np.child, cp.child, scanCheck).flatMap {
371477 case (mergedChild, outputMap, newChildFilter, mergedChildFilter) =>
@@ -459,29 +565,27 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
459565 }
460566
461567 /**
462- * - When we merge projection nodes (`Project` and `Aggregate`) we need to merge the named
463- * expression list coming from the new plan node into the expressions of the projection node of
464- * the merged child plan and return a merged list of expressions that will be placed into the
465- * merged projection node.
568+ * Merges named expression lists of `Project` or `Aggregate` nodes of the new plan into the named
569+ * expression list of a similar node of the cached plan.
570+ *
466571 * - Before we can merge the new expressions, we need to take into account the propagated
467- * attribute mapping that describes the transformation from the input attributes the new plan's
468- * projection node to the input attributes of the merged child plan's projection node.
469- * - While merging the new expressions we need to build a new attribute mapping that describes
470- * the transformation from the output attributes of the new expressions to the output attributes
471- * of the merged list of expression.
472- * - If any filters are propagated from `Filter` nodes below, we need to transform the expressions
473- * to named expressions and merge them into the cached expressions as we did with new expressions.
572+ * attribute mapping that describes the transformation from the input attributes of the new plan
573+ * node to the output attributes of the already merged child plan node.
574+ * - While merging the new expressions we need to build a new attribute mapping to propagate.
575+ * - If any filters are propagated from `Filter` nodes below then we could add all the referenced
576+ * attributes of filter conditions to the merged expression list, but it is better if we alias
577+ * whole filter conditions and propagate only the new boolean attributes.
474578 *
475- * @param newExpressions the expressions of the new plan's projection node
476- * @param outputMap the propagated attribute mapping
477- * @param cachedExpressions the expressions of the cached plan's projection node
478- * @param newChildFilter the propagated filters from `Filter` nodes of the new plan
579+ * @param newExpressions the expression list of the new plan node
580+ * @param outputMap the propagated attribute mapping
581+ * @param cachedExpressions the expression list of the cached plan node
582+ * @param newChildFilter the propagated filters from `Filter` nodes of the new plan
479583 * @param mergedChildFilter the propagated filters from `Filter` nodes of the merged child plan
480584 * @return A tuple of:
481585 * - the merged expression list,
482586 * - the new attribute mapping to propagate,
483- * - the output attributes of the merged newChildFilter to propagate,
484- * - the output attributes of the merged mergedChildFilter to propagate,
587+ * - the output attribute of the merged newChildFilter to propagate,
588+ * - the output attribute of the merged mergedChildFilter to propagate
485589 */
486590 private def mergeNamedExpressions (
487591 newExpressions : Seq [NamedExpression ],
0 commit comments