From ed5307f1fb757b8fc54a3d82fbaa182ca63b9591 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 18 Feb 2022 16:26:53 -0800 Subject: [PATCH 01/10] Allow operator with ClusteredDistribution to require full clustering keys --- .../plans/physical/partitioning.scala | 8 +-- .../apache/spark/sql/internal/SQLConf.scala | 9 +-- .../spark/sql/catalyst/ShuffleSpecSuite.scala | 6 +- .../exchange/EnsureRequirements.scala | 18 +++++- .../execution/streaming/StreamExecution.scala | 5 ++ .../spark/sql/DataFrameAggregateSuite.scala | 59 ++++++++++++++++++- .../exchange/EnsureRequirementsSuite.scala | 2 +- 7 files changed, 91 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 040f1bfab65b..31008b12d94f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -520,10 +520,10 @@ case class HashShuffleSpec( override def canCreatePartitioning: Boolean = { // To avoid potential data skew, we don't allow `HashShuffleSpec` to create partitioning if - // the hash partition keys are not the full join keys (the cluster keys). Then the planner - // will add shuffles with the default partitioning of `ClusteredDistribution`, which uses all - // the join keys. - if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION)) { + // the hash partition keys are not the full clustering keys. Then the planner will add shuffles + // with the default partitioning of `ClusteredDistribution`, which uses all the clustering + // keys. + if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION)) { partitioning.expressions.length == distribution.clustering.length && partitioning.expressions.zip(distribution.clustering).forall { case (l, r) => l.semanticEquals(r) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3a7ce650ea63..8c306e30fc51 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -396,13 +396,14 @@ object SQLConf { .booleanConf .createWithDefault(true) - val REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION = - buildConf("spark.sql.requireAllClusterKeysForCoPartition") + val REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION = + buildConf("spark.sql.requireAllClusterKeysForHashPartition") .internal() .doc("When true, the planner requires all the clustering keys as the hash partition keys " + "of the children, to eliminate the shuffles for the operator that needs its children to " + - "be co-partitioned, such as JOIN node. This is to avoid data skews which can lead to " + - "significant performance regression if shuffles are eliminated.") + "be hash partitioned such as WINDOW node , or hash co-partitioned such as JOIN node. " + + "This is to avoid data skews which can lead to significant performance regression if " + + "shuffles are eliminated.") .version("3.3.0") .booleanConf .createWithDefault(true) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala index 74ec949fe447..745a8ea666b8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala @@ -350,17 +350,17 @@ class ShuffleSpecSuite extends SparkFunSuite with SQLHelper { test("canCreatePartitioning") { val distribution = ClusteredDistribution(Seq($"a", $"b")) - withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false") { + withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key -> "false") { assert(HashShuffleSpec(HashPartitioning(Seq($"a"), 10), distribution).canCreatePartitioning) } - withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "true") { + withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key -> "true") { assert(!HashShuffleSpec(HashPartitioning(Seq($"a"), 10), distribution) .canCreatePartitioning) assert(HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), distribution) .canCreatePartitioning) } assert(SinglePartitionShuffleSpec.canCreatePartitioning) - withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false") { + withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key -> "false") { assert(ShuffleSpecCollection(Seq( HashShuffleSpec(HashPartitioning(Seq($"a"), 10), distribution), HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), distribution))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index de1806ab87b4..58a7bd5c8921 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -56,7 +56,23 @@ case class EnsureRequirements( // Ensure that the operator's children satisfy their output distribution requirements. var children = originalChildren.zip(requiredChildDistributions).map { case (child, distribution) if child.outputPartitioning.satisfies(distribution) => - child + (child.outputPartitioning, distribution) match { + case (p: HashPartitioning, d: ClusteredDistribution) => + val spec = HashShuffleSpec(p, d) + if (spec.canCreatePartitioning) { + child + } else { + // If shuffle spec cannot create desired partitioning, add an extra shuffle for + // `ClusteredDistribution` even though its child `HashPartitioning` satisfies its + // distribution. This could happen when child `HashPartitioning` is partitioned on + // subset of clustering keys of `ClusteredDistribution`. Opt in this feature with + // enabling "spark.sql.requireAllClusterKeysForHashPartition" to require partition on + // full clustering keys, can help avoid potential data skewness for some jobs. + val numPartitions = d.requiredNumPartitions.getOrElse(conf.numShufflePartitions) + ShuffleExchangeExec(d.createPartitioning(numPartitions), child, shuffleOrigin) + } + case _ => child + } case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) case (child, distribution) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index bbc6fa05d514..d373c712c108 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -287,6 +287,11 @@ abstract class StreamExecution( // Disable cost-based join optimization as we do not want stateful operations // to be rearranged sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false") + // Disable any config affecting the required child distribution of stateful operators. + // Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution for + // details. + sparkSessionForStream.conf.set(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key, + "false") updateStatusMessage("Initializing sources") // force initialization of the logical plan so that the sources can be created diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 215d38d8b167..e17a735f54ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -24,10 +24,12 @@ import scala.util.Random import org.scalatest.matchers.must.Matchers.the import org.apache.spark.SparkException -import org.apache.spark.sql.execution.WholeStageCodegenExec +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.execution.{InputAdapter, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec} import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -1453,6 +1455,57 @@ class DataFrameAggregateSuite extends QueryTest val df = Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): _*).sum("id") checkAnswer(df, Row(2, 3, 1)) } + + test("SPARK-38237: require all cluster keys for child required distribution") { + def partitionExpressionsColumns(expressions: Seq[Expression]): Seq[String] = { + expressions.flatMap { + case ref: AttributeReference => Some(ref.name) + } + } + + def isShuffleExecByRequirement( + plan: ShuffleExchangeExec, + desiredClusterColumns: Seq[String], + desiredNumPartitions: Int): Boolean = plan match { + case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS) + if partitionExpressionsColumns(op.expressions) === desiredClusterColumns && + op.numPartitions === desiredNumPartitions => true + + case _ => false + } + + val df = Seq(("a", 1, 1), ("a", 2, 2), ("b", 1, 3), ("b", 1, 4)).toDF("key1", "key2", "value") + + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key -> "true") { + + val grouped = df + // repartition by sub group keys which satisfies ClusteredDistribution(group keys) + .repartition($"key1") + .groupBy($"key1", $"key2") + .agg(sum($"value")) + + checkAnswer(grouped, Seq(Row("a", 1, 1), Row("a", 2, 2), Row("b", 1, 7))) + + val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) + + val shuffleByRequirement = grouped.queryExecution.executedPlan.flatMap { + case a if a.isInstanceOf[BaseAggregateExec] => + a.children.head match { + case InputAdapter(s: ShuffleExchangeExec) + if isShuffleExecByRequirement(s, Seq("key1", "key2"), numPartitions) => Some(s) + case s: ShuffleExchangeExec + if isShuffleExecByRequirement(s, Seq("key1", "key2"), numPartitions) => Some(s) + case _ => None + } + + case _ => None + } + + assert(shuffleByRequirement.nonEmpty, "Can't find desired shuffle node from the query plan") + } + } } case class B(c: Option[Double]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala index db99557466d9..93fd1e9d1149 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala @@ -140,7 +140,7 @@ class EnsureRequirementsSuite extends SharedSparkSession { private def applyEnsureRequirementsWithSubsetKeys(plan: SparkPlan): SparkPlan = { var res: SparkPlan = null - withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false") { + withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key -> "false") { res = EnsureRequirements.apply(plan) } res From 27244d2e0c4994ff039a1fc3d68f9e7e658704dc Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 18 Feb 2022 17:03:38 -0800 Subject: [PATCH 02/10] Rewrite the logic to make it disable by default --- .../plans/physical/partitioning.scala | 24 ++++++++++++------- .../apache/spark/sql/internal/SQLConf.scala | 20 ++++++++++++---- .../spark/sql/catalyst/ShuffleSpecSuite.scala | 6 ++--- .../exchange/EnsureRequirements.scala | 19 ++++++++------- .../execution/streaming/StreamExecution.scala | 2 +- .../spark/sql/DataFrameAggregateSuite.scala | 2 +- .../exchange/EnsureRequirementsSuite.scala | 2 +- 7 files changed, 47 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 31008b12d94f..765b06b45f34 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -271,6 +271,17 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = HashShuffleSpec(this, distribution) + /** + * Checks if [[HashPartitioning]] is partitioned on exactly same full `clustering` keys of + * [[ClusteredDistribution]]. + */ + def isPartitionedOnFullKeys(distribution: ClusteredDistribution): Boolean = { + expressions.length == distribution.clustering.length && + expressions.zip(distribution.clustering).forall { + case (l, r) => l.semanticEquals(r) + } + } + /** * Returns an expression that will produce a valid partition ID(i.e. non-negative and is less * than numPartitions) based on hashing expressions. @@ -520,14 +531,11 @@ case class HashShuffleSpec( override def canCreatePartitioning: Boolean = { // To avoid potential data skew, we don't allow `HashShuffleSpec` to create partitioning if - // the hash partition keys are not the full clustering keys. Then the planner will add shuffles - // with the default partitioning of `ClusteredDistribution`, which uses all the clustering - // keys. - if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION)) { - partitioning.expressions.length == distribution.clustering.length && - partitioning.expressions.zip(distribution.clustering).forall { - case (l, r) => l.semanticEquals(r) - } + // the hash partition keys are not the full join keys (the cluster keys). Then the planner + // will add shuffles with the default partitioning of `ClusteredDistribution`, which uses all + // the join keys. + if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION)) { + partitioning.isPartitionedOnFullKeys(distribution) } else { true } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8c306e30fc51..403ad7400118 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -396,18 +396,28 @@ object SQLConf { .booleanConf .createWithDefault(true) - val REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION = - buildConf("spark.sql.requireAllClusterKeysForHashPartition") + val REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION = + buildConf("spark.sql.requireAllClusterKeysForCoPartition") .internal() .doc("When true, the planner requires all the clustering keys as the hash partition keys " + "of the children, to eliminate the shuffles for the operator that needs its children to " + - "be hash partitioned such as WINDOW node , or hash co-partitioned such as JOIN node. " + - "This is to avoid data skews which can lead to significant performance regression if " + - "shuffles are eliminated.") + "be co-partitioned, such as JOIN node. This is to avoid data skews which can lead to " + + "significant performance regression if shuffles are eliminated.") .version("3.3.0") .booleanConf .createWithDefault(true) + val REQUIRE_ALL_CLUSTER_KEYS_FOR_SOLE_PARTITION = + buildConf("spark.sql.requireAllClusterKeysForSolePartition") + .internal() + .doc("When true, the planner requires all the clustering keys as the hash partition keys " + + "of the child, to eliminate the shuffle for the operator has only one child and requires " + + "child to be partitioned, such as AGGREGATE and WINDOW node. This is to avoid data skews " + + "which can lead to significant performance regression if shuffle is eliminated.") + .version("3.3.0") + .booleanConf + .createWithDefault(false) + val RADIX_SORT_ENABLED = buildConf("spark.sql.sort.enableRadixSort") .internal() .doc("When true, enable use of radix sort when possible. Radix sort is much faster but " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala index 745a8ea666b8..74ec949fe447 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala @@ -350,17 +350,17 @@ class ShuffleSpecSuite extends SparkFunSuite with SQLHelper { test("canCreatePartitioning") { val distribution = ClusteredDistribution(Seq($"a", $"b")) - withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key -> "false") { + withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false") { assert(HashShuffleSpec(HashPartitioning(Seq($"a"), 10), distribution).canCreatePartitioning) } - withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key -> "true") { + withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "true") { assert(!HashShuffleSpec(HashPartitioning(Seq($"a"), 10), distribution) .canCreatePartitioning) assert(HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), distribution) .canCreatePartitioning) } assert(SinglePartitionShuffleSpec.canCreatePartitioning) - withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key -> "false") { + withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false") { assert(ShuffleSpecCollection(Seq( HashShuffleSpec(HashPartitioning(Seq($"a"), 10), distribution), HashShuffleSpec(HashPartitioning(Seq($"a", $"b"), 10), distribution))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 58a7bd5c8921..5fc4d0097eee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.internal.SQLConf /** * Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]] @@ -58,18 +59,18 @@ case class EnsureRequirements( case (child, distribution) if child.outputPartitioning.satisfies(distribution) => (child.outputPartitioning, distribution) match { case (p: HashPartitioning, d: ClusteredDistribution) => - val spec = HashShuffleSpec(p, d) - if (spec.canCreatePartitioning) { - child - } else { - // If shuffle spec cannot create desired partitioning, add an extra shuffle for - // `ClusteredDistribution` even though its child `HashPartitioning` satisfies its - // distribution. This could happen when child `HashPartitioning` is partitioned on - // subset of clustering keys of `ClusteredDistribution`. Opt in this feature with - // enabling "spark.sql.requireAllClusterKeysForHashPartition" to require partition on + if (conf.getConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_SOLE_PARTITION) && + requiredChildDistributions.size == 1 && !p.isPartitionedOnFullKeys(d)) { + // Add an extra shuffle for `ClusteredDistribution` even though its child + // `HashPartitioning` satisfies its distribution. This could happen when child + // `HashPartitioning` is partitioned on subset of clustering keys of + // `ClusteredDistribution`. Opt in this feature with + // enabling "spark.sql.requireAllClusterKeysForSolePartition" to require partition on // full clustering keys, can help avoid potential data skewness for some jobs. val numPartitions = d.requiredNumPartitions.getOrElse(conf.numShufflePartitions) ShuffleExchangeExec(d.createPartitioning(numPartitions), child, shuffleOrigin) + } else { + child } case _ => child } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index d373c712c108..f176a3c6b92c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -290,7 +290,7 @@ abstract class StreamExecution( // Disable any config affecting the required child distribution of stateful operators. // Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution for // details. - sparkSessionForStream.conf.set(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key, + sparkSessionForStream.conf.set(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_SOLE_PARTITION.key, "false") updateStatusMessage("Initializing sources") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index e17a735f54ed..145ed012f90d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -1478,7 +1478,7 @@ class DataFrameAggregateSuite extends QueryTest withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", - SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key -> "true") { + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_SOLE_PARTITION.key -> "true") { val grouped = df // repartition by sub group keys which satisfies ClusteredDistribution(group keys) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala index 93fd1e9d1149..db99557466d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala @@ -140,7 +140,7 @@ class EnsureRequirementsSuite extends SharedSparkSession { private def applyEnsureRequirementsWithSubsetKeys(plan: SparkPlan): SparkPlan = { var res: SparkPlan = null - withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key -> "false") { + withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false") { res = EnsureRequirements.apply(plan) } res From 22696bc4edb3ad8560dc539bd9f34cc9eab1716a Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 18 Feb 2022 17:45:15 -0800 Subject: [PATCH 03/10] Change logic in HashPartitioning.satisfies0 --- .../plans/physical/partitioning.scala | 12 ++++++++++-- .../apache/spark/sql/internal/SQLConf.scala | 11 +++++++---- .../exchange/EnsureRequirements.scala | 19 +------------------ .../execution/streaming/StreamExecution.scala | 2 +- .../spark/sql/DataFrameAggregateSuite.scala | 2 +- 5 files changed, 20 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 765b06b45f34..43a207f529db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -261,8 +261,16 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { case (l, r) => l.semanticEquals(r) } - case ClusteredDistribution(requiredClustering, _) => - expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) + case c @ ClusteredDistribution(requiredClustering, _) => + if (SQLConf.get.requireAllClusterKeysForHashPartition) { + // Checks `HashPartitioning` is partitioned on exactly full clustering keys of + // `ClusteredDistribution`. Opt in this feature with enabling + // "spark.sql.requireAllClusterKeysForHashPartition", can help avoid potential data + // skewness for some jobs. + isPartitionedOnFullKeys(c) + } else { + expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) + } case _ => false } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 403ad7400118..662b90e9ac5f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -407,12 +407,12 @@ object SQLConf { .booleanConf .createWithDefault(true) - val REQUIRE_ALL_CLUSTER_KEYS_FOR_SOLE_PARTITION = - buildConf("spark.sql.requireAllClusterKeysForSolePartition") + val REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION = + buildConf("spark.sql.requireAllClusterKeysForHashPartition") .internal() .doc("When true, the planner requires all the clustering keys as the hash partition keys " + - "of the child, to eliminate the shuffle for the operator has only one child and requires " + - "child to be partitioned, such as AGGREGATE and WINDOW node. This is to avoid data skews " + + "of the children, to eliminate the shuffle for the operator that needs its children to " + + "be hash partitioned, such as AGGREGATE and WINDOW node. This is to avoid data skews " + "which can lead to significant performance regression if shuffle is eliminated.") .version("3.3.0") .booleanConf @@ -3962,6 +3962,9 @@ class SQLConf extends Serializable with Logging { def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN) + def requireAllClusterKeysForHashPartition: Boolean = + getConf(REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION) + def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED) def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 5fc4d0097eee..de1806ab87b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec} -import org.apache.spark.sql.internal.SQLConf /** * Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]] @@ -57,23 +56,7 @@ case class EnsureRequirements( // Ensure that the operator's children satisfy their output distribution requirements. var children = originalChildren.zip(requiredChildDistributions).map { case (child, distribution) if child.outputPartitioning.satisfies(distribution) => - (child.outputPartitioning, distribution) match { - case (p: HashPartitioning, d: ClusteredDistribution) => - if (conf.getConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_SOLE_PARTITION) && - requiredChildDistributions.size == 1 && !p.isPartitionedOnFullKeys(d)) { - // Add an extra shuffle for `ClusteredDistribution` even though its child - // `HashPartitioning` satisfies its distribution. This could happen when child - // `HashPartitioning` is partitioned on subset of clustering keys of - // `ClusteredDistribution`. Opt in this feature with - // enabling "spark.sql.requireAllClusterKeysForSolePartition" to require partition on - // full clustering keys, can help avoid potential data skewness for some jobs. - val numPartitions = d.requiredNumPartitions.getOrElse(conf.numShufflePartitions) - ShuffleExchangeExec(d.createPartitioning(numPartitions), child, shuffleOrigin) - } else { - child - } - case _ => child - } + child case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) case (child, distribution) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index f176a3c6b92c..d373c712c108 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -290,7 +290,7 @@ abstract class StreamExecution( // Disable any config affecting the required child distribution of stateful operators. // Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution for // details. - sparkSessionForStream.conf.set(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_SOLE_PARTITION.key, + sparkSessionForStream.conf.set(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key, "false") updateStatusMessage("Initializing sources") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 145ed012f90d..e17a735f54ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -1478,7 +1478,7 @@ class DataFrameAggregateSuite extends QueryTest withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", - SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_SOLE_PARTITION.key -> "true") { + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key -> "true") { val grouped = df // repartition by sub group keys which satisfies ClusteredDistribution(group keys) From 89b6c52162a1ebaff6e75c3bc2616690f44a6ae9 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 23 Feb 2022 15:32:58 -0800 Subject: [PATCH 04/10] Rewrite logic to add field inside ClusteredDistribution --- .../plans/physical/partitioning.scala | 45 ++++++++++++++----- .../apache/spark/sql/internal/SQLConf.scala | 18 ++++---- .../execution/streaming/StreamExecution.scala | 2 +- .../spark/sql/DataFrameAggregateSuite.scala | 2 +- 4 files changed, 44 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 43a207f529db..e259119d3478 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -72,10 +72,15 @@ case object AllTuples extends Distribution { /** * Represents data where tuples that share the same values for the `clustering` * [[Expression Expressions]] will be co-located in the same partition. + * + * @param requiredAllClusterKeys When true, `Partitioning` which satisfies this distribution, + * must match all `clustering` expressions in the same ordering. */ case class ClusteredDistribution( clustering: Seq[Expression], - requiredNumPartitions: Option[Int] = None) extends Distribution { + requiredNumPartitions: Option[Int] = None, + requiredAllClusterKeys: Boolean = SQLConf.get.getConf( + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION)) extends Distribution { require( clustering != Nil, "The clustering expressions of a ClusteredDistribution should not be Nil. " + @@ -88,6 +93,19 @@ case class ClusteredDistribution( s"the actual number of partitions is $numPartitions.") HashPartitioning(clustering, numPartitions) } + + /** + * Checks if `expressions` match all `clustering` expressions in the same ordering. + * + * `Partitioning` should call this to check its expressions when `requiredAllClusterKeys` + * is set to true. + */ + def areAllClusterKeysMatched(expressions: Seq[Expression]): Boolean = { + expressions.length == clustering.length && + expressions.zip(clustering).forall { + case (l, r) => l.semanticEquals(r) + } + } } /** @@ -261,13 +279,11 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { case (l, r) => l.semanticEquals(r) } - case c @ ClusteredDistribution(requiredClustering, _) => - if (SQLConf.get.requireAllClusterKeysForHashPartition) { - // Checks `HashPartitioning` is partitioned on exactly full clustering keys of - // `ClusteredDistribution`. Opt in this feature with enabling - // "spark.sql.requireAllClusterKeysForHashPartition", can help avoid potential data - // skewness for some jobs. - isPartitionedOnFullKeys(c) + case c @ ClusteredDistribution(requiredClustering, _, requiredAllClusterKeys) => + if (requiredAllClusterKeys) { + // Checks `HashPartitioning` is partitioned on exactly same clustering keys of + // `ClusteredDistribution`. + c.areAllClusterKeysMatched(expressions) } else { expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) } @@ -341,8 +357,15 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) // `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a, b)`. val minSize = Seq(requiredOrdering.size, ordering.size).min requiredOrdering.take(minSize) == ordering.take(minSize) - case ClusteredDistribution(requiredClustering, _) => - ordering.map(_.child).forall(x => requiredClustering.exists(_.semanticEquals(x))) + case c @ ClusteredDistribution(requiredClustering, _, requiredAllClusterKeys) => + val expressions = ordering.map(_.child) + if (requiredAllClusterKeys) { + // Checks `RangePartitioning` is partitioned on exactly same clustering keys of + // `ClusteredDistribution`. + c.areAllClusterKeysMatched(expressions) + } else { + expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) + } case _ => false } } @@ -543,7 +566,7 @@ case class HashShuffleSpec( // will add shuffles with the default partitioning of `ClusteredDistribution`, which uses all // the join keys. if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION)) { - partitioning.isPartitionedOnFullKeys(distribution) + distribution.areAllClusterKeysMatched(partitioning.expressions) } else { true } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 662b90e9ac5f..140586da7e58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -407,13 +407,14 @@ object SQLConf { .booleanConf .createWithDefault(true) - val REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION = - buildConf("spark.sql.requireAllClusterKeysForHashPartition") - .internal() - .doc("When true, the planner requires all the clustering keys as the hash partition keys " + - "of the children, to eliminate the shuffle for the operator that needs its children to " + - "be hash partitioned, such as AGGREGATE and WINDOW node. This is to avoid data skews " + - "which can lead to significant performance regression if shuffle is eliminated.") + val REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION = + buildConf("spark.sql.requireAllClusterKeysForDistribution") + .internal() + .doc("When true, the planner requires all the clustering keys as the partition keys " + + "(with same ordering) of the children, to eliminate the shuffle for the operator that " + + "requires its children be clustered distributed, such as AGGREGATE and WINDOW node. " + + "This is to avoid data kews which can lead to significant performance regression if " + + "shuffle is eliminated.") .version("3.3.0") .booleanConf .createWithDefault(false) @@ -3962,9 +3963,6 @@ class SQLConf extends Serializable with Logging { def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN) - def requireAllClusterKeysForHashPartition: Boolean = - getConf(REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION) - def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED) def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index d373c712c108..f9ae65cdc47d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -290,7 +290,7 @@ abstract class StreamExecution( // Disable any config affecting the required child distribution of stateful operators. // Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution for // details. - sparkSessionForStream.conf.set(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key, + sparkSessionForStream.conf.set(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key, "false") updateStatusMessage("Initializing sources") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index e17a735f54ed..8ca64f738ecd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -1478,7 +1478,7 @@ class DataFrameAggregateSuite extends QueryTest withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", - SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_HASH_PARTITION.key -> "true") { + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key -> "false") { val grouped = df // repartition by sub group keys which satisfies ClusteredDistribution(group keys) From e39c70508793e7e1b65da45f5da88a7020f73854 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 23 Feb 2022 15:34:42 -0800 Subject: [PATCH 05/10] Remove unused method --- .../sql/catalyst/plans/physical/partitioning.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index e259119d3478..80944f0c5555 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -295,17 +295,6 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = HashShuffleSpec(this, distribution) - /** - * Checks if [[HashPartitioning]] is partitioned on exactly same full `clustering` keys of - * [[ClusteredDistribution]]. - */ - def isPartitionedOnFullKeys(distribution: ClusteredDistribution): Boolean = { - expressions.length == distribution.clustering.length && - expressions.zip(distribution.clustering).forall { - case (l, r) => l.semanticEquals(r) - } - } - /** * Returns an expression that will produce a valid partition ID(i.e. non-negative and is less * than numPartitions) based on hashing expressions. From 3f8e80484ab95398ddc53514d6d4fea28028307b Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 23 Feb 2022 18:24:14 -0800 Subject: [PATCH 06/10] Fix unit test --- .../spark/sql/DataFrameAggregateSuite.scala | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 8ca64f738ecd..29885a229d64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -1465,12 +1465,9 @@ class DataFrameAggregateSuite extends QueryTest def isShuffleExecByRequirement( plan: ShuffleExchangeExec, - desiredClusterColumns: Seq[String], - desiredNumPartitions: Int): Boolean = plan match { - case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS) - if partitionExpressionsColumns(op.expressions) === desiredClusterColumns && - op.numPartitions === desiredNumPartitions => true - + desiredClusterColumns: Seq[String]): Boolean = plan match { + case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS) => + partitionExpressionsColumns(op.expressions) === desiredClusterColumns case _ => false } @@ -1478,7 +1475,7 @@ class DataFrameAggregateSuite extends QueryTest withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", - SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key -> "false") { + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key -> "true") { val grouped = df // repartition by sub group keys which satisfies ClusteredDistribution(group keys) @@ -1488,15 +1485,13 @@ class DataFrameAggregateSuite extends QueryTest checkAnswer(grouped, Seq(Row("a", 1, 1), Row("a", 2, 2), Row("b", 1, 7))) - val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) - val shuffleByRequirement = grouped.queryExecution.executedPlan.flatMap { case a if a.isInstanceOf[BaseAggregateExec] => a.children.head match { case InputAdapter(s: ShuffleExchangeExec) - if isShuffleExecByRequirement(s, Seq("key1", "key2"), numPartitions) => Some(s) + if isShuffleExecByRequirement(s, Seq("key1", "key2")) => Some(s) case s: ShuffleExchangeExec - if isShuffleExecByRequirement(s, Seq("key1", "key2"), numPartitions) => Some(s) + if isShuffleExecByRequirement(s, Seq("key1", "key2")) => Some(s) case _ => None } From 4897a86b62f3539616b560033ca21cb38b6e2ca6 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 23 Feb 2022 18:36:11 -0800 Subject: [PATCH 07/10] Rename to requireAllClusterKeys --- .../catalyst/plans/physical/partitioning.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 80944f0c5555..e809c021f8e7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -73,13 +73,13 @@ case object AllTuples extends Distribution { * Represents data where tuples that share the same values for the `clustering` * [[Expression Expressions]] will be co-located in the same partition. * - * @param requiredAllClusterKeys When true, `Partitioning` which satisfies this distribution, - * must match all `clustering` expressions in the same ordering. + * @param requireAllClusterKeys When true, `Partitioning` which satisfies this distribution, + * must match all `clustering` expressions in the same ordering. */ case class ClusteredDistribution( clustering: Seq[Expression], requiredNumPartitions: Option[Int] = None, - requiredAllClusterKeys: Boolean = SQLConf.get.getConf( + requireAllClusterKeys: Boolean = SQLConf.get.getConf( SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION)) extends Distribution { require( clustering != Nil, @@ -97,7 +97,7 @@ case class ClusteredDistribution( /** * Checks if `expressions` match all `clustering` expressions in the same ordering. * - * `Partitioning` should call this to check its expressions when `requiredAllClusterKeys` + * `Partitioning` should call this to check its expressions when `requireAllClusterKeys` * is set to true. */ def areAllClusterKeysMatched(expressions: Seq[Expression]): Boolean = { @@ -279,8 +279,8 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { case (l, r) => l.semanticEquals(r) } - case c @ ClusteredDistribution(requiredClustering, _, requiredAllClusterKeys) => - if (requiredAllClusterKeys) { + case c @ ClusteredDistribution(requiredClustering, _, requireAllClusterKeys) => + if (requireAllClusterKeys) { // Checks `HashPartitioning` is partitioned on exactly same clustering keys of // `ClusteredDistribution`. c.areAllClusterKeysMatched(expressions) @@ -346,9 +346,9 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) // `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a, b)`. val minSize = Seq(requiredOrdering.size, ordering.size).min requiredOrdering.take(minSize) == ordering.take(minSize) - case c @ ClusteredDistribution(requiredClustering, _, requiredAllClusterKeys) => + case c @ ClusteredDistribution(requiredClustering, _, requireAllClusterKeys) => val expressions = ordering.map(_.child) - if (requiredAllClusterKeys) { + if (requireAllClusterKeys) { // Checks `RangePartitioning` is partitioned on exactly same clustering keys of // `ClusteredDistribution`. c.areAllClusterKeysMatched(expressions) From 2a6b4bb7245110e2a87cf4a079afc18b2ad7b027 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 23 Feb 2022 18:49:16 -0800 Subject: [PATCH 08/10] Adjust new field position in parameter --- .../catalyst/plans/physical/partitioning.scala | 8 ++++---- .../spark/sql/catalyst/DistributionSuite.scala | 6 +++--- .../spark/sql/execution/adaptive/AQEUtils.scala | 2 +- .../streaming/FlatMapGroupsWithStateExec.scala | 6 ++++-- .../execution/streaming/statefulOperators.scala | 15 ++++++++++----- .../apache/spark/sql/execution/PlannerSuite.scala | 2 +- 6 files changed, 23 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index e809c021f8e7..78d153c5a0e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -78,9 +78,9 @@ case object AllTuples extends Distribution { */ case class ClusteredDistribution( clustering: Seq[Expression], - requiredNumPartitions: Option[Int] = None, requireAllClusterKeys: Boolean = SQLConf.get.getConf( - SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION)) extends Distribution { + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION), + requiredNumPartitions: Option[Int] = None) extends Distribution { require( clustering != Nil, "The clustering expressions of a ClusteredDistribution should not be Nil. " + @@ -279,7 +279,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { case (l, r) => l.semanticEquals(r) } - case c @ ClusteredDistribution(requiredClustering, _, requireAllClusterKeys) => + case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) => if (requireAllClusterKeys) { // Checks `HashPartitioning` is partitioned on exactly same clustering keys of // `ClusteredDistribution`. @@ -346,7 +346,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) // `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a, b)`. val minSize = Seq(requiredOrdering.size, ordering.size).min requiredOrdering.take(minSize) == ordering.take(minSize) - case c @ ClusteredDistribution(requiredClustering, _, requireAllClusterKeys) => + case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) => val expressions = ordering.map(_.child) if (requireAllClusterKeys) { // Checks `RangePartitioning` is partitioned on exactly same clustering keys of diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala index e047d4c070be..fbf037520fb0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala @@ -254,17 +254,17 @@ class DistributionSuite extends SparkFunSuite { test("Partitioning.numPartitions must match Distribution.requiredNumPartitions to satisfy it") { checkSatisfied( SinglePartition, - ClusteredDistribution(Seq($"a", $"b", $"c"), Some(10)), + ClusteredDistribution(Seq($"a", $"b", $"c"), requiredNumPartitions = Some(10)), false) checkSatisfied( HashPartitioning(Seq($"a", $"b", $"c"), 10), - ClusteredDistribution(Seq($"a", $"b", $"c"), Some(5)), + ClusteredDistribution(Seq($"a", $"b", $"c"), requiredNumPartitions = Some(5)), false) checkSatisfied( RangePartitioning(Seq($"a".asc, $"b".asc, $"c".asc), 10), - ClusteredDistribution(Seq($"a", $"b", $"c"), Some(5)), + ClusteredDistribution(Seq($"a", $"b", $"c"), requiredNumPartitions = Some(5)), false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala index cbd4ee698df2..51833012a128 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala @@ -37,7 +37,7 @@ object AQEUtils { } else { None } - Some(ClusteredDistribution(h.expressions, numPartitions)) + Some(ClusteredDistribution(h.expressions, requiredNumPartitions = numPartitions)) case f: FilterExec => getRequiredDistribution(f.child) case s: SortExec if !s.global => getRequiredDistribution(s.child) case c: CollectMetricsExec => getRequiredDistribution(c.child) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index 93ed5916bfb2..dfcb70737666 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -96,8 +96,10 @@ case class FlatMapGroupsWithStateExec( // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution // before making any changes. // TODO(SPARK-38204) - ClusteredDistribution(groupingAttributes, stateInfo.map(_.numPartitions)) :: - ClusteredDistribution(initialStateGroupAttrs, stateInfo.map(_.numPartitions)) :: + ClusteredDistribution( + groupingAttributes, requiredNumPartitions = stateInfo.map(_.numPartitions)) :: + ClusteredDistribution( + initialStateGroupAttrs, requiredNumPartitions = stateInfo.map(_.numPartitions)) :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 3ab2ad47e98c..45c6430f9642 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -340,7 +340,8 @@ case class StateStoreRestoreExec( if (keyExpressions.isEmpty) { AllTuples :: Nil } else { - ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil + ClusteredDistribution(keyExpressions, + requiredNumPartitions = stateInfo.map(_.numPartitions)) :: Nil } } @@ -502,7 +503,8 @@ case class StateStoreSaveExec( if (keyExpressions.isEmpty) { AllTuples :: Nil } else { - ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil + ClusteredDistribution(keyExpressions, + requiredNumPartitions = stateInfo.map(_.numPartitions)) :: Nil } } @@ -582,7 +584,8 @@ case class SessionWindowStateStoreRestoreExec( // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution // before making any changes. // TODO(SPARK-38204) - ClusteredDistribution(keyWithoutSessionExpressions, stateInfo.map(_.numPartitions)) :: Nil + ClusteredDistribution(keyWithoutSessionExpressions, + requiredNumPartitions = stateInfo.map(_.numPartitions)) :: Nil } override def requiredChildOrdering: Seq[Seq[SortOrder]] = { @@ -696,7 +699,8 @@ case class SessionWindowStateStoreSaveExec( // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution // before making any changes. // TODO(SPARK-38204) - ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil + ClusteredDistribution(keyExpressions, + requiredNumPartitions = stateInfo.map(_.numPartitions)) :: Nil } override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = { @@ -757,7 +761,8 @@ case class StreamingDeduplicateExec( // NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution // before making any changes. // TODO(SPARK-38204) - ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil + ClusteredDistribution(keyExpressions, + requiredNumPartitions = stateInfo.map(_.numPartitions)) :: Nil } override protected def doExecute(): RDD[InternalRow] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 383b84dc0d8f..2ab1b6d4963a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -432,7 +432,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } test("EnsureRequirements should respect ClusteredDistribution's num partitioning") { - val distribution = ClusteredDistribution(Literal(1) :: Nil, Some(13)) + val distribution = ClusteredDistribution(Literal(1) :: Nil, requiredNumPartitions = Some(13)) // Number of partitions differ val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 13) val childPartitioning = HashPartitioning(Literal(1) :: Nil, 5) From 39b0504c88a6630c332b44eb331fc3554b5821d9 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 24 Feb 2022 16:57:24 -0800 Subject: [PATCH 09/10] Rewrite unit test with window query, and add more test --- .../sql/catalyst/DistributionSuite.scala | 36 +++++++++++++ .../spark/sql/DataFrameAggregateSuite.scala | 54 ++----------------- .../sql/DataFrameWindowFunctionsSuite.scala | 49 ++++++++++++++++- 3 files changed, 87 insertions(+), 52 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala index fbf037520fb0..a924a9ed02e5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala @@ -169,6 +169,24 @@ class DistributionSuite extends SparkFunSuite { ClusteredDistribution(Seq($"d", $"e")), false) + // When ClusteredDistribution.requireAllClusterKeys is set to true, + // HashPartitioning can only satisfy ClusteredDistribution iff its hash expressions are + // exactly same as the required clustering expressions. + checkSatisfied( + HashPartitioning(Seq($"a", $"b", $"c"), 10), + ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true), + true) + + checkSatisfied( + HashPartitioning(Seq($"b", $"c"), 10), + ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true), + false) + + checkSatisfied( + HashPartitioning(Seq($"b", $"a", $"c"), 10), + ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true), + false) + // HashPartitioning cannot satisfy OrderedDistribution checkSatisfied( HashPartitioning(Seq($"a", $"b", $"c"), 10), @@ -249,6 +267,24 @@ class DistributionSuite extends SparkFunSuite { RangePartitioning(Seq($"a".asc, $"b".asc, $"c".asc), 10), ClusteredDistribution(Seq($"c", $"d")), false) + + // When ClusteredDistribution.requireAllClusterKeys is set to true, + // RangePartitioning can only satisfy ClusteredDistribution iff its ordering expressions are + // exactly same as the required clustering expressions. + checkSatisfied( + RangePartitioning(Seq($"a".asc, $"b".asc, $"c".asc), 10), + ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true), + true) + + checkSatisfied( + RangePartitioning(Seq($"a".asc, $"b".asc), 10), + ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true), + false) + + checkSatisfied( + RangePartitioning(Seq($"b".asc, $"a".asc, $"c".asc), 10), + ClusteredDistribution(Seq($"a", $"b", $"c"), requireAllClusterKeys = true), + false) } test("Partitioning.numPartitions must match Distribution.requiredNumPartitions to satisfy it") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 29885a229d64..215d38d8b167 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -24,12 +24,10 @@ import scala.util.Random import org.scalatest.matchers.must.Matchers.the import org.apache.spark.SparkException -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} -import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.execution.{InputAdapter, WholeStageCodegenExec} +import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} -import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec} +import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -1455,52 +1453,6 @@ class DataFrameAggregateSuite extends QueryTest val df = Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): _*).sum("id") checkAnswer(df, Row(2, 3, 1)) } - - test("SPARK-38237: require all cluster keys for child required distribution") { - def partitionExpressionsColumns(expressions: Seq[Expression]): Seq[String] = { - expressions.flatMap { - case ref: AttributeReference => Some(ref.name) - } - } - - def isShuffleExecByRequirement( - plan: ShuffleExchangeExec, - desiredClusterColumns: Seq[String]): Boolean = plan match { - case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS) => - partitionExpressionsColumns(op.expressions) === desiredClusterColumns - case _ => false - } - - val df = Seq(("a", 1, 1), ("a", 2, 2), ("b", 1, 3), ("b", 1, 4)).toDF("key1", "key2", "value") - - withSQLConf( - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", - SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key -> "true") { - - val grouped = df - // repartition by sub group keys which satisfies ClusteredDistribution(group keys) - .repartition($"key1") - .groupBy($"key1", $"key2") - .agg(sum($"value")) - - checkAnswer(grouped, Seq(Row("a", 1, 1), Row("a", 2, 2), Row("b", 1, 7))) - - val shuffleByRequirement = grouped.queryExecution.executedPlan.flatMap { - case a if a.isInstanceOf[BaseAggregateExec] => - a.children.head match { - case InputAdapter(s: ShuffleExchangeExec) - if isShuffleExecByRequirement(s, Seq("key1", "key2")) => Some(s) - case s: ShuffleExchangeExec - if isShuffleExecByRequirement(s, Seq("key1", "key2")) => Some(s) - case _ => None - } - - case _ => None - } - - assert(shuffleByRequirement.nonEmpty, "Can't find desired shuffle node from the query plan") - } - } } case class B(c: Option[Double]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 1491c5a4f26b..3cf61c3402bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -20,9 +20,12 @@ package org.apache.spark.sql import org.scalatest.matchers.must.Matchers.the import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.optimizer.TransposeWindow +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.execution.exchange.Exchange +import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec} +import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -1071,4 +1074,48 @@ class DataFrameWindowFunctionsSuite extends QueryTest Row("a", 1, "x", "x"), Row("b", 0, null, null))) } + + test("SPARK-38237: require all cluster keys for child required distribution for window query") { + def partitionExpressionsColumns(expressions: Seq[Expression]): Seq[String] = { + expressions.flatMap { + case ref: AttributeReference => Some(ref.name) + } + } + + def isShuffleExecByRequirement( + plan: ShuffleExchangeExec, + desiredClusterColumns: Seq[String]): Boolean = plan match { + case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS) => + partitionExpressionsColumns(op.expressions) === desiredClusterColumns + case _ => false + } + + val df = Seq(("a", 1, 1), ("a", 2, 2), ("b", 1, 3), ("b", 1, 4)).toDF("key1", "key2", "value") + val windowSpec = Window.partitionBy("key1", "key2").orderBy("value") + + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key -> "true") { + + val windowed = df + // repartition by subset of window partitionBy keys which satisfies ClusteredDistribution + .repartition($"key1") + .select( + lead($"key1", 1).over(windowSpec), + lead($"value", 1).over(windowSpec)) + + checkAnswer(windowed, Seq(Row("b", 4), Row(null, null), Row(null, null), Row(null, null))) + + val shuffleByRequirement = windowed.queryExecution.executedPlan.find { + case w: WindowExec => + w.child.find { + case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, Seq("key1", "key2")) + case _ => false + }.nonEmpty + case _ => false + } + + assert(shuffleByRequirement.nonEmpty, "Can't find desired shuffle node from the query plan") + } + } } From 9463fc8f230899218cfa2bfb5127c291126b04e4 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 24 Feb 2022 22:26:21 -0800 Subject: [PATCH 10/10] Fix typo in config doc --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 140586da7e58..a050156518c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -413,7 +413,7 @@ object SQLConf { .doc("When true, the planner requires all the clustering keys as the partition keys " + "(with same ordering) of the children, to eliminate the shuffle for the operator that " + "requires its children be clustered distributed, such as AGGREGATE and WINDOW node. " + - "This is to avoid data kews which can lead to significant performance regression if " + + "This is to avoid data skews which can lead to significant performance regression if " + "shuffle is eliminated.") .version("3.3.0") .booleanConf