From 6f01ac9c5b85109a194d31c8afd25e00fda77f0a Mon Sep 17 00:00:00 2001 From: Navin Viswanath Date: Mon, 20 Jul 2020 16:08:08 -0700 Subject: [PATCH 1/3] [SPARK-30876][SQL]: Optimizer fails to infer constraints within join What changes were proposed in this pull request? For a 3-way join of the kind described below, the optimizer fails to infer the constraint a=1. This is because of the interaction between the following rules: ColumnPruning, InferFiltersFromConstraints, and PredicatePushdown. For the following SQL query: ``` create table t1(a int, b int, c int); create table t2(a int, b int, c int); create table t3(a int, b int, c int); select count(*) from t1 join t2 join t3 on (t1.a = t2.b and t2.b = t3.c and t3.c = 1); ``` The optimized logical plan produced is: ``` == Optimized Logical Plan == Aggregate [count(1) AS count(1)#66L] +- Project +- Join Inner, (b#61 = c#65) :- Project [b#61] : +- Join Inner, (a#57 = b#61) : :- Project [a#57] : : +- Filter isnotnull(a#57) : : +- HiveTableRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#57, b#58, c#59], Statistics(sizeInBytes=8.0 EiB) : +- Project [b#61] : +- Filter (isnotnull(b#61) AND (b#61 = 1)) : +- HiveTableRelation `default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#60, b#61, c#62], Statistics(sizeInBytes=8.0 EiB) +- Project [c#65] +- Filter (isnotnull(c#65) AND (c#65 = 1)) +- HiveTableRelation `default`.`t3`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#63, b#64, c#65], Statistics(sizeInBytes=8.0 EiB) ``` The constraint gets dropped because after column pruning, the InferFiltersFromConstraints rule on the outer join operator only infers b=1, since the project operator drops the constraint that refers to 'a'. Later, when PushdownPredicates runs again, the b=1 constraint gets pushed down to relation 'y'. In order to resolve this, this patch proposes running InferFiltersFromConstraints and PredicatePushdown in a batch to fixed point, in place of running InferFiltersFromConstraints once in a batch. With this change, the constraint b=1 will be inferred and pushed down first. Then, the constraint a=1 will be inferred for the inner join operator since that can be inferred because b=1 is now available to infer it. Running PredicatePushdown again will push a=1 to its correct position. Why are the changes needed? Improves performance of optimization. Also this worked correctly earlier in 2.3. Does this PR introduce any user-facing change? No How was this patch tested? Added a unit test. The test should fail if InferFiltersFromConstraints is run only once. --- .../sql/catalyst/optimizer/Optimizer.scala | 3 +- .../InferFiltersPredicatePushdownSuite.scala | 76 +++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersPredicatePushdownSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 8e57e9737c73f..b098f5002f03a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -116,7 +116,8 @@ abstract class Optimizer(catalogManager: CatalogManager) operatorOptimizationRuleSet.filterNot(_ == InferFiltersFromConstraints) Batch("Operator Optimization before Inferring Filters", fixedPoint, rulesWithoutInferFiltersFromConstraints: _*) :: - Batch("Infer Filters", Once, + Batch("Infer Filters", fixedPoint, + PushDownPredicates, InferFiltersFromConstraints) :: Batch("Operator Optimization after Inferring Filters", fixedPoint, rulesWithoutInferFiltersFromConstraints: _*) :: diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersPredicatePushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersPredicatePushdownSuite.scala new file mode 100644 index 0000000000000..a187dbabdd005 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersPredicatePushdownSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, FakeV2SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{IsNotNull, Literal} +import org.apache.spark.sql.catalyst.expressions.aggregate.Count +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.internal.SQLConf + +class InferFiltersPredicatePushdownSuite extends PlanTest { + + object OptimizeOriginal extends Optimizer( + new CatalogManager( + new SQLConf(), + FakeV2SessionCatalog, + new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, new SQLConf()))) { + override def defaultBatches: Seq[Batch] = super.defaultBatches + } + + object Optimize extends RuleExecutor[LogicalPlan] { + val b = Batch("InferAndPushDownFilters", FixedPoint(100), + PushPredicateThroughJoin, + ColumnPruning + ) + val batches = + b :: + Batch("infer filter from constraints", FixedPoint(100), + PushDownPredicates, + InferFiltersFromConstraints) :: + Nil + } + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + test("SPARK-30876: optimize constraints in 3-way join") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val z = testRelation.subquery('z) + val originalQuery = x.join(y).join(z) + .where(("x.a".attr === "y.b".attr) && ("y.b".attr === "z.c".attr) && ("z.c".attr === 1)) + .groupBy()(Count(Literal("*"))).analyze + val optimized = Optimize.execute(originalQuery) + val correctAnswer = x.where('a === 1 && IsNotNull('a)).select('a) + .join(y.where('b === 1 && IsNotNull('b)) + .select('b), Inner, Some("x.a".attr === "y.b".attr)) + .select('b) + .join(z.where('c === 1 && IsNotNull('c)) + .select('c), Inner, Some('b === "z.c".attr)) + .select() + .groupBy()(Count(Literal("*"))).analyze + comparePlans(optimized, correctAnswer) + } + +} From 51977bccf3e5f3d90c3302e3e0fe937978a1f092 Mon Sep 17 00:00:00 2001 From: Navin Viswanath Date: Thu, 23 Jul 2020 18:02:45 -0700 Subject: [PATCH 2/3] Remove unused code --- .../InferFiltersPredicatePushdownSuite.scala | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersPredicatePushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersPredicatePushdownSuite.scala index a187dbabdd005..ed06a5408de4a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersPredicatePushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersPredicatePushdownSuite.scala @@ -17,29 +17,16 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, FakeV2SessionCatalog} -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{IsNotNull, Literal} import org.apache.spark.sql.catalyst.expressions.aggregate.Count -import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.expressions.{IsNotNull, Literal} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} import org.apache.spark.sql.catalyst.rules.RuleExecutor -import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.internal.SQLConf class InferFiltersPredicatePushdownSuite extends PlanTest { - object OptimizeOriginal extends Optimizer( - new CatalogManager( - new SQLConf(), - FakeV2SessionCatalog, - new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, new SQLConf()))) { - override def defaultBatches: Seq[Batch] = super.defaultBatches - } - object Optimize extends RuleExecutor[LogicalPlan] { val b = Batch("InferAndPushDownFilters", FixedPoint(100), PushPredicateThroughJoin, From 2b3c82af48a8890a951cc3bf5f85d92b698c4d31 Mon Sep 17 00:00:00 2001 From: Navin Viswanath Date: Thu, 23 Jul 2020 18:11:32 -0700 Subject: [PATCH 3/3] Fix import order --- .../optimizer/InferFiltersPredicatePushdownSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersPredicatePushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersPredicatePushdownSuite.scala index ed06a5408de4a..45e8ddd31d0f3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersPredicatePushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersPredicatePushdownSuite.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.expressions.{IsNotNull, Literal} -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor class InferFiltersPredicatePushdownSuite extends PlanTest {