From 6594601ed317606fea50b14e25b28998bc1d19a2 Mon Sep 17 00:00:00 2001 From: ulysses Date: Mon, 6 Jul 2020 22:23:20 +0800 Subject: [PATCH 1/6] init --- .../sql/catalyst/optimizer/expressions.scala | 41 ++++++++++++------- .../catalyst/optimizer/OptimizeInSuite.scala | 27 ++++++++++++ 2 files changed, 54 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index bd400f86ea2c..a247582cdc77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -235,26 +235,39 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { * [[InSet (value, HashSet[Literal])]] which is much faster. */ object OptimizeIn extends Rule[LogicalPlan] { + def optimizeIn(expr: In, v: Expression, list: Seq[Expression]): Expression = { + val newList = ExpressionSet(list).toSeq + if (newList.length == 1 + // TODO: `EqualTo` for structural types are not working. Until SPARK-24443 is addressed, + // TODO: we exclude them in this rule. + && !v.isInstanceOf[CreateNamedStruct] + && !newList.head.isInstanceOf[CreateNamedStruct]) { + EqualTo(v, newList.head) + } else if (newList.length > SQLConf.get.optimizerInSetConversionThreshold) { + val hSet = newList.map(e => e.eval(EmptyRow)) + InSet(v, HashSet() ++ hSet) + } else if (newList.length < list.length) { + expr.copy(list = newList) + } else { // newList.length == list.length && newList.length > 1 + expr + } + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { case In(v, list) if list.isEmpty => // When v is not nullable, the following expression will be optimized // to FalseLiteral which is tested in OptimizeInSuite.scala If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType)) - case expr @ In(v, list) if expr.inSetConvertible => - val newList = ExpressionSet(list).toSeq - if (newList.length == 1 - // TODO: `EqualTo` for structural types are not working. Until SPARK-24443 is addressed, - // TODO: we exclude them in this rule. - && !v.isInstanceOf[CreateNamedStruct] - && !newList.head.isInstanceOf[CreateNamedStruct]) { - EqualTo(v, newList.head) - } else if (newList.length > SQLConf.get.optimizerInSetConversionThreshold) { - val hSet = newList.map(e => e.eval(EmptyRow)) - InSet(v, HashSet() ++ hSet) - } else if (newList.length < list.length) { - expr.copy(list = newList) - } else { // newList.length == list.length && newList.length > 1 + case expr @ In(v, list) => + // split list to 2 parts so that we can push down convertible part + val (convertible, nonConvertible) = list.partition(_.isInstanceOf[Literal]) + if (convertible.nonEmpty && nonConvertible.isEmpty) { + optimizeIn(expr, v, list) + } else if (convertible.nonEmpty && nonConvertible.nonEmpty) { + val optimizedIn = optimizeIn(In(v, convertible), v, convertible) + And(optimizedIn, In(v, nonConvertible)) + } else { expr } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index a36083b84704..7ec6e6f4b1ae 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -238,4 +238,31 @@ class OptimizeInSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + + test("SPARK-32196: Extract convertible part if In is not convertible") { + val originalQuery1 = + testRelation + .where(In(UnresolvedAttribute("a"), Seq(Literal(1), UnresolvedAttribute("b")))) + .analyze + val optimized1 = Optimize.execute(originalQuery1) + val correctAnswer1 = + testRelation + .where( + And(EqualTo(UnresolvedAttribute("a"), Literal(1)), + In(UnresolvedAttribute("a"), Seq(UnresolvedAttribute("b")))) + ) + .analyze + comparePlans(optimized1, correctAnswer1) + + val originalQuery2 = + testRelation + .where(In(UnresolvedAttribute("a"), Seq(UnresolvedAttribute("b")))) + .analyze + val optimized2 = Optimize.execute(originalQuery2) + val correctAnswer2 = + testRelation + .where(In(UnresolvedAttribute("a"), Seq(UnresolvedAttribute("b")))) + .analyze + comparePlans(optimized2, correctAnswer2) + } } From bbf63c91b614dae1c97cb20fb58b497ffe2c72f3 Mon Sep 17 00:00:00 2001 From: ulysses Date: Mon, 6 Jul 2020 22:31:27 +0800 Subject: [PATCH 2/6] up ut name --- .../apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index 7ec6e6f4b1ae..447c75e51ee1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -239,7 +239,7 @@ class OptimizeInSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("SPARK-32196: Extract convertible part if In is not convertible") { + test("SPARK-32196: Extract In convertible part if it is not convertible") { val originalQuery1 = testRelation .where(In(UnresolvedAttribute("a"), Seq(Literal(1), UnresolvedAttribute("b")))) From 0d982d2ebb5fd04d581b3a38e329d38988485659 Mon Sep 17 00:00:00 2001 From: ulysses Date: Tue, 7 Jul 2020 08:08:40 +0800 Subject: [PATCH 3/6] fix --- .../sql/catalyst/optimizer/expressions.scala | 5 +++-- .../sql/catalyst/optimizer/OptimizeInSuite.scala | 15 --------------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index a247582cdc77..e92ec469c152 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -230,8 +230,9 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { * Optimize IN predicates: * 1. Converts the predicate to false when the list is empty and * the value is not nullable. - * 2. Removes literal repetitions. - * 3. Replaces [[In (value, seq[Literal])]] with optimized version + * 2. Extract convertible part from list. + * 3. Removes literal repetitions. + * 4. Replaces [[In (value, seq[Literal])]] with optimized version * [[InSet (value, HashSet[Literal])]] which is much faster. */ object OptimizeIn extends Rule[LogicalPlan] { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index 447c75e51ee1..9433fcfa2b47 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -91,21 +91,6 @@ class OptimizeInSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("OptimizedIn test: In clause not optimized in case filter has attributes") { - val originalQuery = - testRelation - .where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2), UnresolvedAttribute("b")))) - .analyze - - val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = - testRelation - .where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2), UnresolvedAttribute("b")))) - .analyze - - comparePlans(optimized, correctAnswer) - } - test("OptimizedIn test: NULL IN (expr1, ..., exprN) gets transformed to Filter(null)") { val originalQuery = testRelation From f45d6c31f8d4b840e212dd0561d97ba96e13b458 Mon Sep 17 00:00:00 2001 From: ulysses Date: Tue, 7 Jul 2020 09:51:27 +0800 Subject: [PATCH 4/6] fix or --- .../org/apache/spark/sql/catalyst/optimizer/expressions.scala | 2 +- .../apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index e92ec469c152..9410d4279360 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -267,7 +267,7 @@ object OptimizeIn extends Rule[LogicalPlan] { optimizeIn(expr, v, list) } else if (convertible.nonEmpty && nonConvertible.nonEmpty) { val optimizedIn = optimizeIn(In(v, convertible), v, convertible) - And(optimizedIn, In(v, nonConvertible)) + Or(optimizedIn, In(v, nonConvertible)) } else { expr } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index 9433fcfa2b47..1014a207f89b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -233,7 +233,7 @@ class OptimizeInSuite extends PlanTest { val correctAnswer1 = testRelation .where( - And(EqualTo(UnresolvedAttribute("a"), Literal(1)), + Or(EqualTo(UnresolvedAttribute("a"), Literal(1)), In(UnresolvedAttribute("a"), Seq(UnresolvedAttribute("b")))) ) .analyze From 9bf23cc95e1a58044ce5c4e41348f93d99c705a6 Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 8 Jul 2020 12:55:38 +0800 Subject: [PATCH 5/6] fix comment --- .../org/apache/spark/sql/catalyst/optimizer/expressions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 9410d4279360..e7ae91759525 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -261,7 +261,7 @@ object OptimizeIn extends Rule[LogicalPlan] { // to FalseLiteral which is tested in OptimizeInSuite.scala If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType)) case expr @ In(v, list) => - // split list to 2 parts so that we can push down convertible part + // split list to 2 parts so that we can optimize convertible part val (convertible, nonConvertible) = list.partition(_.isInstanceOf[Literal]) if (convertible.nonEmpty && nonConvertible.isEmpty) { optimizeIn(expr, v, list) From 21c5262e7025a7e1453c1aeaa901ada0781e875d Mon Sep 17 00:00:00 2001 From: ulysses Date: Sun, 12 Jul 2020 09:04:12 +0800 Subject: [PATCH 6/6] add conf --- .../sql/catalyst/optimizer/expressions.scala | 3 +- .../apache/spark/sql/internal/SQLConf.scala | 10 +++++ .../catalyst/optimizer/OptimizeInSuite.scala | 40 +++++++++++++------ 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index e7ae91759525..00395fe3bd0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -265,7 +265,8 @@ object OptimizeIn extends Rule[LogicalPlan] { val (convertible, nonConvertible) = list.partition(_.isInstanceOf[Literal]) if (convertible.nonEmpty && nonConvertible.isEmpty) { optimizeIn(expr, v, list) - } else if (convertible.nonEmpty && nonConvertible.nonEmpty) { + } else if (convertible.nonEmpty && nonConvertible.nonEmpty && + SQLConf.get.optimizerInExtractLiteralPart) { val optimizedIn = optimizeIn(In(v, convertible), v, convertible) Or(optimizedIn, In(v, nonConvertible)) } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3149d14c1ddc..deb29d38a354 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -197,6 +197,14 @@ object SQLConf { .intConf .createWithDefault(100) + val OPTIMIZER_IN_EXTRACT_LITERAL_PART = + buildConf("spark.sql.optimizer.inExtractLiteralPart") + .internal() + .doc("When true, we will extract and optimize the literal part of in if not all are literal.") + .version("3.1.0") + .booleanConf + .createWithDefault(true) + val OPTIMIZER_INSET_CONVERSION_THRESHOLD = buildConf("spark.sql.optimizer.inSetConversionThreshold") .internal() @@ -2761,6 +2769,8 @@ class SQLConf extends Serializable with Logging { def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS) + def optimizerInExtractLiteralPart: Boolean = getConf(OPTIMIZER_IN_EXTRACT_LITERAL_PART) + def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD) def optimizerInSetSwitchThreshold: Int = getConf(OPTIMIZER_INSET_SWITCH_THRESHOLD) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index 1014a207f89b..7fcc82cc9ab5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_IN_EXTRACT_LITERAL_PART import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_INSET_CONVERSION_THRESHOLD import org.apache.spark.sql.types._ @@ -225,19 +226,32 @@ class OptimizeInSuite extends PlanTest { } test("SPARK-32196: Extract In convertible part if it is not convertible") { - val originalQuery1 = - testRelation - .where(In(UnresolvedAttribute("a"), Seq(Literal(1), UnresolvedAttribute("b")))) - .analyze - val optimized1 = Optimize.execute(originalQuery1) - val correctAnswer1 = - testRelation - .where( - Or(EqualTo(UnresolvedAttribute("a"), Literal(1)), - In(UnresolvedAttribute("a"), Seq(UnresolvedAttribute("b")))) - ) - .analyze - comparePlans(optimized1, correctAnswer1) + Seq("true", "false").foreach { enable => + withSQLConf(OPTIMIZER_IN_EXTRACT_LITERAL_PART.key -> enable) { + val originalQuery1 = + testRelation + .where(In(UnresolvedAttribute("a"), Seq(Literal(1), UnresolvedAttribute("b")))) + .analyze + val optimized1 = Optimize.execute(originalQuery1) + + if (enable.toBoolean) { + val correctAnswer1 = + testRelation + .where( + Or(EqualTo(UnresolvedAttribute("a"), Literal(1)), + In(UnresolvedAttribute("a"), Seq(UnresolvedAttribute("b")))) + ) + .analyze + comparePlans(optimized1, correctAnswer1) + } else { + val correctAnswer1 = + testRelation + .where(In(UnresolvedAttribute("a"), Seq(Literal(1), UnresolvedAttribute("b")))) + .analyze + comparePlans(optimized1, correctAnswer1) + } + } + } val originalQuery2 = testRelation