From 64989b4c42ed53f0123b79d970a656352baa385e Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 21 Apr 2016 08:46:07 -0700 Subject: [PATCH 1/2] [SPARK-14796][SQL] Add spark.sql.optimizer.minSetSize config option. --- .../spark/sql/catalyst/CatalystConf.scala | 2 ++ .../sql/catalyst/optimizer/Optimizer.scala | 7 +++--- .../optimizer/ConstantFoldingSuite.scala | 3 ++- .../catalyst/optimizer/OptimizeInSuite.scala | 24 +++++++++++++++++-- .../apache/spark/sql/internal/SQLConf.scala | 10 +++++++- 5 files changed, 39 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 0efe3c4d456ee..64f88e48cb19e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -29,6 +29,7 @@ trait CatalystConf { def groupByOrdinal: Boolean def optimizerMaxIterations: Int + def optimizerMinSetSize: Int def maxCaseBranchesForCodegen: Int /** @@ -47,6 +48,7 @@ case class SimpleCatalystConf( orderByOrdinal: Boolean = true, groupByOrdinal: Boolean = true, optimizerMaxIterations: Int = 100, + optimizerMinSetSize: Int = 10, maxCaseBranchesForCodegen: Int = 20) extends CatalystConf { } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e6d554565d442..6f4dfac98ca4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -87,7 +87,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) CombineUnions, // Constant folding and strength reduction NullPropagation, - OptimizeIn, + OptimizeIn(conf), ConstantFolding, LikeSimplification, BooleanSimplification, @@ -682,10 +682,11 @@ object ConstantFolding extends Rule[LogicalPlan] { * Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, HashSet[Literal])]] * which is much faster */ -object OptimizeIn extends Rule[LogicalPlan] { +case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) && list.size > 10 => + case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) && + list.size > conf.optimizerMinSetSize => val hSet = list.map(e => e.eval(EmptyRow)) InSet(v, HashSet() ++ hSet) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala index 641c89873dcc4..d9655bbcc2ce1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedExtractValue} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -33,7 +34,7 @@ class ConstantFoldingSuite extends PlanTest { Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: Batch("ConstantFolding", Once, - OptimizeIn, + OptimizeIn(SimpleCatalystConf(true)), ConstantFolding, BooleanSimplification) :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index 0e43ce034fb48..e1f4f3d88e59c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.catalyst.optimizer +import scala.collection.immutable.HashSet + +import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedAttribute} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.types._ @@ -36,7 +39,7 @@ class OptimizeInSuite extends PlanTest { NullPropagation, ConstantFolding, BooleanSimplification, - OptimizeIn) :: Nil + OptimizeIn(SimpleCatalystConf(true))) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) @@ -128,4 +131,21 @@ class OptimizeInSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("OptimizedIn test: Use configuration.") { + val plan = + testRelation + .where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2), Literal(3)))) + .analyze + + val notOptimizedPlan = OptimizeIn(SimpleCatalystConf(true))(plan) + comparePlans(notOptimizedPlan, plan) + + val optimizedPlan = OptimizeIn(SimpleCatalystConf(true, optimizerMinSetSize = 2))(plan) + optimizedPlan match { + case Filter(cond, _) + if cond.isInstanceOf[InSet] && cond.asInstanceOf[InSet].getHSet().size == 3 => + // pass + case _ => fail("Unexpected result for OptimizedIn") + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6e7c1bc1333a6..a3d4ed52edffe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -54,10 +54,16 @@ object SQLConf { val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations") .internal() - .doc("The max number of iterations the optimizer and analyzer runs") + .doc("The max number of iterations the optimizer and analyzer runs.") .intConf .createWithDefault(100) + val OPTIMIZER_MIN_SET_SIZE = SQLConfigBuilder("spark.sql.optimizer.minSetSize") + .internal() + .doc("The minimum threshold of set size to be optimized.") + .intConf + .createWithDefault(10) + val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts") .doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " + "When set to false, only one SQLContext/HiveContext is allowed to be created " + @@ -529,6 +535,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS) + def optimizerMinSetSize: Int = getConf(OPTIMIZER_MIN_SET_SIZE) + def checkpointLocation: String = getConf(CHECKPOINT_LOCATION) def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) From c7a6d9b0c90640f35703636e23cb31ca9e46f744 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 21 Apr 2016 11:43:47 -0700 Subject: [PATCH 2/2] Address the comments --- .../apache/spark/sql/catalyst/CatalystConf.scala | 4 ++-- .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../sql/catalyst/optimizer/OptimizeInSuite.scala | 12 ++++++------ .../org/apache/spark/sql/internal/SQLConf.scala | 13 +++++++------ 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 64f88e48cb19e..6e798a53adf27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -29,7 +29,7 @@ trait CatalystConf { def groupByOrdinal: Boolean def optimizerMaxIterations: Int - def optimizerMinSetSize: Int + def optimizerInSetConversionThreshold: Int def maxCaseBranchesForCodegen: Int /** @@ -48,7 +48,7 @@ case class SimpleCatalystConf( orderByOrdinal: Boolean = true, groupByOrdinal: Boolean = true, optimizerMaxIterations: Int = 100, - optimizerMinSetSize: Int = 10, + optimizerInSetConversionThreshold: Int = 10, maxCaseBranchesForCodegen: Int = 20) extends CatalystConf { } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6f4dfac98ca4b..ffd7c2eb71bf5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -686,7 +686,7 @@ case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) && - list.size > conf.optimizerMinSetSize => + list.size > conf.optimizerInSetConversionThreshold => val hSet = list.map(e => e.eval(EmptyRow)) InSet(v, HashSet() ++ hSet) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index e1f4f3d88e59c..f1a4ea8280ab7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.optimizer -import scala.collection.immutable.HashSet - import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedAttribute} import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -39,7 +37,7 @@ class OptimizeInSuite extends PlanTest { NullPropagation, ConstantFolding, BooleanSimplification, - OptimizeIn(SimpleCatalystConf(true))) :: Nil + OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) @@ -131,16 +129,18 @@ class OptimizeInSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("OptimizedIn test: Use configuration.") { + test("OptimizedIn test: Setting the threshold for turning Set into InSet.") { val plan = testRelation .where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2), Literal(3)))) .analyze - val notOptimizedPlan = OptimizeIn(SimpleCatalystConf(true))(plan) + val notOptimizedPlan = OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))(plan) comparePlans(notOptimizedPlan, plan) - val optimizedPlan = OptimizeIn(SimpleCatalystConf(true, optimizerMinSetSize = 2))(plan) + // Reduce the threshold to turning into InSet. + val optimizedPlan = OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true, + optimizerInSetConversionThreshold = 2))(plan) optimizedPlan match { case Filter(cond, _) if cond.isInstanceOf[InSet] && cond.asInstanceOf[InSet].getHSet().size == 3 => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a3d4ed52edffe..33cf21e7aed9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -58,11 +58,12 @@ object SQLConf { .intConf .createWithDefault(100) - val OPTIMIZER_MIN_SET_SIZE = SQLConfigBuilder("spark.sql.optimizer.minSetSize") - .internal() - .doc("The minimum threshold of set size to be optimized.") - .intConf - .createWithDefault(10) + val OPTIMIZER_INSET_CONVERSION_THRESHOLD = + SQLConfigBuilder("spark.sql.optimizer.inSetConversionThreshold") + .internal() + .doc("The threshold of set size for InSet conversion.") + .intConf + .createWithDefault(10) val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts") .doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " + @@ -535,7 +536,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS) - def optimizerMinSetSize: Int = getConf(OPTIMIZER_MIN_SET_SIZE) + def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD) def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)