Skip to content

Conversation

@sunchao
Copy link
Member

@sunchao sunchao commented Aug 31, 2023

What changes were proposed in this pull request?

In SPJ, currently the logic to handle partially clustered distribution is a bit complicated. For instance, when the feature is eanbled (by enabling both conf.v2BucketingPushPartValuesEnabled and conf.v2BucketingPartiallyClusteredDistributionEnabled), Spark should postpone the combining of input splits until it is about to create an input RDD in BatchScanExec. To implement this, groupPartitions in DataSourceV2ScanExecBase currently takes the flag as input and has two different behaviors, which could be confusing.

This PR introduces a new field in KeyGroupedPartitioning, named originalPartitionValues, that is used to store the original partition values from input before splits combining has been applied. The field is used when partially clustered distribution is enabled. With this, groupPartitions becomes easier to understand.

In addition, this also simplifies BatchScanExec.inputRDD by combining two branches where partially clustered distribution is not enabled.

Why are the changes needed?

To simplify the current logic in the SPJ w.r.t partially clustered distribution.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing tests.

Was this patch authored or co-authored using generative AI tooling?

@github-actions github-actions bot added the SQL label Aug 31, 2023
Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, its cleaner and a bit easier to read than before

*
* @param expressions partition expressions for the partitioning.
* @param numPartitions the number of partitions
* @param partitionValues the values for the cluster keys of the distribution, must be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think to add 'final cluster keys' to the javadoc , to make it even more clear?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

* Information about key-grouped partitions, which contains a list of grouped partitions as well
* as the original input partitions before the grouping.
*/
private[v2] case class KeyGroupedPartitionInfo(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it would refer to info about one KeyGroupedPartition. How about KeyGroupedPartitionInfos ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it but is Infos a proper plural noun?

(splits.head.asInstanceOf[HasPartitionKey].partitionKey(), splits)
})

// This means the input partitions are not grouped by partition values. We'll need to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we clarify 'this' When partially-clustered, input partitions are not grouped by partition values

Nit: groupByPartitionValues seems never actually defined, can we fix it? Does it refer to groupedPartitions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let me improve this comments too.

@sunchao
Copy link
Member Author

sunchao commented Sep 1, 2023

cc @viirya @dongjoon-hyun @cloud-fan @Hisoka-X too

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking great!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

@dongjoon-hyun
Copy link
Member

Merged to master for Apache Spark 4.0.0.

@sunchao
Copy link
Member Author

sunchao commented Sep 5, 2023

Thanks all!

@sunchao sunchao deleted the SPARK-45036 branch September 5, 2023 16:28
@LuciferYang
Copy link
Contributor

LuciferYang commented Sep 6, 2023

@sunchao After this pr is merged, there are test failures in the daily tests of Scala 2.13, we can reproduce the issue locally by executing the following command:

dev/change-scala-version.sh 2.13
build/sbt "sql/testOnly org.apache.spark.sql.connector.KeyGroupedPartitioningSuite" -Pscala-2.13
[info] - clustered distribution: output partitioning should be KeyGroupedPartitioning *** FAILED *** (1 second, 895 milliseconds)
[info]   KeyGroupedPartitioning(List(transformexpression(org.apache.spark.sql.connector.catalog.functions.YearsFunction$@92fa663, ts#11, None)), 3, List([50], [51], [52]), List([50], [51], [52])) did not equal KeyGroupedPartitioning(ArraySeq(transformexpression(org.apache.spark.sql.connector.catalog.functions.YearsFunction$@92fa663, ts#11, None)), 3, List([51], [50], [52]), ArraySeq([50], [51], [52])) (KeyGroupedPartitioningSuite.scala:237)
[info]   Analysis:
[info]   KeyGroupedPartitioning(partitionValues: List(0: [50] -> [51], 1: [51] -> [50]), uniquePartitionValues: List(0: [50] -> [51], 1: [51] -> [50]))
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.connector.KeyGroupedPartitioningSuite.checkQueryPlan(KeyGroupedPartitioningSuite.scala:237)
[info]   at org.apache.spark.sql.connector.KeyGroupedPartitioningSuite.$anonfun$new$4(KeyGroupedPartitioningSuite.scala:103)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.sql.connector.DistributionAndOrderingSuiteBase.org$scalatest$BeforeAndAfter$$super$runTest(DistributionAndOrderingSuiteBase.scala:33)
[info]   at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:213)
[info]   at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:203)
[info]   at org.apache.spark.sql.connector.DistributionAndOrderingSuiteBase.runTest(DistributionAndOrderingSuiteBase.scala:33)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:333)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.sql.connector.DistributionAndOrderingSuiteBase.org$scalatest$BeforeAndAfter$$super$run(DistributionAndOrderingSuiteBase.scala:33)
[info]   at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:273)
[info]   at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:271)
[info]   at org.apache.spark.sql.connector.DistributionAndOrderingSuiteBase.run(DistributionAndOrderingSuiteBase.scala:33)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:750)
...
[info] - SPARK-41471: shuffle one side: work with group partition split *** FAILED *** (185 milliseconds)
[info]   Results do not match for query:
[info]   Timezone: sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-28800000,dstSavings=3600000,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-28800000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=7200000,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=7200000,endTimeMode=0]]
[info]   Timezone Env: 
[info]   
[info]   == Parsed Logical Plan ==
[info]   'Sort ['id ASC NULLS FIRST, 'purchase_price ASC NULLS FIRST, 'sale_price ASC NULLS FIRST], true
[info]   +- 'Project ['id, 'name, 'i.price AS purchase_price#5710, 'p.price AS sale_price#5711]
[info]      +- 'Join Inner, ('i.id = 'p.item_id)
[info]         :- 'SubqueryAlias i
[info]         :  +- 'UnresolvedRelation [testcat, ns, items], [], false
[info]         +- 'SubqueryAlias p
[info]            +- 'UnresolvedRelation [testcat, ns, purchases], [], false
[info]   
[info]   == Analyzed Logical Plan ==
[info]   id: bigint, name: string, purchase_price: float, sale_price: float
[info]   Sort [id#5712L ASC NULLS FIRST, purchase_price#5710 ASC NULLS FIRST, sale_price#5711 ASC NULLS FIRST], true
[info]   +- Project [id#5712L, name#5713, price#5714 AS purchase_price#5710, price#5717 AS sale_price#5711]
[info]      +- Join Inner, (id#5712L = item_id#5716L)
[info]         :- SubqueryAlias i
[info]         :  +- SubqueryAlias testcat.ns.items
[info]         :     +- RelationV2[id#5712L, name#5713, price#5714, arrive_time#5715] testcat.ns.items testcat.ns.items
[info]         +- SubqueryAlias p
[info]            +- SubqueryAlias testcat.ns.purchases
[info]               +- RelationV2[item_id#5716L, price#5717, time#5718] testcat.ns.purchases testcat.ns.purchases
[info]   
[info]   == Optimized Logical Plan ==
[info]   Sort [id#5712L ASC NULLS FIRST, purchase_price#5710 ASC NULLS FIRST, sale_price#5711 ASC NULLS FIRST], true
[info]   +- Project [id#5712L, name#5713, price#5714 AS purchase_price#5710, price#5717 AS sale_price#5711]
[info]      +- Join Inner, (id#5712L = item_id#5716L)
[info]         :- Filter isnotnull(id#5712L)
[info]         :  +- RelationV2[id#5712L, name#5713, price#5714] testcat.ns.items
[info]         +- Filter isnotnull(item_id#5716L)
[info]            +- RelationV2[item_id#5716L, price#5717] testcat.ns.purchases
[info]   
[info]   == Physical Plan ==
[info]   AdaptiveSparkPlan isFinalPlan=true
[info]   +- == Final Plan ==
[info]      AQEShuffleRead local
[info]      +- ShuffleQueryStage 1
[info]         +- Exchange rangepartitioning(id#5712L ASC NULLS FIRST, purchase_price#5710 ASC NULLS FIRST, sale_price#5711 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [plan_id=22114]
[info]            +- *(4) Project [id#5712L, name#5713, price#5714 AS purchase_price#5710, price#5717 AS sale_price#5711]
[info]               +- *(4) SortMergeJoin [id#5712L], [item_id#5716L], Inner
[info]                  :- *(2) Sort [id#5712L ASC NULLS FIRST], false, 0
[info]                  :  +- *(2) Project [id#5712L, name#5713, price#5714]
[info]                  :     +- *(2) Filter isnotnull(id#5712L)
[info]                  :        +- BatchScan testcat.ns.items[id#5712L, name#5713, price#5714] class org.apache.spark.sql.connector.catalog.InMemoryBaseTable$InMemoryBatchScan RuntimeFilters: []
[info]                  +- *(3) Sort [item_id#5716L ASC NULLS FIRST], false, 0
[info]                     +- ShuffleQueryStage 0
[info]                        +- Exchange KeyGroupedPartitioning(Vector(item_id#5716L),3,List([4], [3], [1]),List()), ENSURE_REQUIREMENTS, [plan_id=22048]
[info]                           +- *(1) Project [item_id#5716L, price#5717]
[info]                              +- *(1) Filter isnotnull(item_id#5716L)
[info]                                 +- BatchScan testcat.ns.purchases[item_id#5716L, price#5717] class org.apache.spark.sql.connector.catalog.InMemoryBaseTable$InMemoryBatchScan RuntimeFilters: []
[info]   +- == Initial Plan ==
[info]      Sort [id#5712L ASC NULLS FIRST, purchase_price#5710 ASC NULLS FIRST, sale_price#5711 ASC NULLS FIRST], true, 0
[info]      +- Exchange rangepartitioning(id#5712L ASC NULLS FIRST, purchase_price#5710 ASC NULLS FIRST, sale_price#5711 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [plan_id=21858]
[info]         +- Project [id#5712L, name#5713, price#5714 AS purchase_price#5710, price#5717 AS sale_price#5711]
[info]            +- SortMergeJoin [id#5712L], [item_id#5716L], Inner
[info]               :- Sort [id#5712L ASC NULLS FIRST], false, 0
[info]               :  +- Project [id#5712L, name#5713, price#5714]
[info]               :     +- Filter isnotnull(id#5712L)
[info]               :        +- BatchScan testcat.ns.items[id#5712L, name#5713, price#5714] class org.apache.spark.sql.connector.catalog.InMemoryBaseTable$InMemoryBatchScan RuntimeFilters: []
[info]               +- Sort [item_id#5716L ASC NULLS FIRST], false, 0
[info]                  +- Exchange KeyGroupedPartitioning(Vector(item_id#5716L),3,List([4], [3], [1]),List()), ENSURE_REQUIREMENTS, [plan_id=21852]
[info]                     +- Project [item_id#5716L, price#5717]
[info]                        +- Filter isnotnull(item_id#5716L)
[info]                           +- BatchScan testcat.ns.purchases[item_id#5716L, price#5717] class org.apache.spark.sql.connector.catalog.InMemoryBaseTable$InMemoryBatchScan RuntimeFilters: []
[info]   
[info]   == Results ==
[info]   
[info]   == Results ==
[info]   !== Correct Answer - 2 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<id:bigint,name:string,purchase_price:float,sale_price:float>
[info]   ![1,aa,40.0,42.0]           [3,bb,10.0,19.5]
[info]   ![3,bb,10.0,19.5] (QueryTest.scala:244)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.apache.spark.sql.QueryTest$.newAssertionFailedException(QueryTest.scala:234)
[info]   at org.scalatest.Assertions.fail(Assertions.scala:933)
[info]   at org.scalatest.Assertions.fail$(Assertions.scala:929)
[info]   at org.apache.spark.sql.QueryTest$.fail(QueryTest.scala:234)
[info]   at org.apache.spark.sql.QueryTest$.checkAnswer(QueryTest.scala:244)
[info]   at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:151)
[info]   at org.apache.spark.sql.connector.KeyGroupedPartitioningSuite.$anonfun$new$125(KeyGroupedPartitioningSuite.scala:1219)
[info]   at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
[info]   at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
[info]   at org.apache.spark.sql.connector.DistributionAndOrderingSuiteBase.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(DistributionAndOrderingSuiteBase.scala:33)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:247)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:245)
[info]   at org.apache.spark.sql.connector.DistributionAndOrderingSuiteBase.withSQLConf(DistributionAndOrderingSuiteBase.scala:33)
[info]   at org.apache.spark.sql.connector.KeyGroupedPartitioningSuite.$anonfun$new$124(KeyGroupedPartitioningSuite.scala:1214)
[info]   at org.apache.spark.sql.connector.KeyGroupedPartitioningSuite.$anonfun$new$124$adapted(KeyGroupedPartitioningSuite.scala:1210)
[info]   at scala.collection.immutable.List.foreach(List.scala:333)
[info]   at org.apache.spark.sql.connector.KeyGroupedPartitioningSuite.$anonfun$new$123(KeyGroupedPartitioningSuite.scala:1210)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.sql.connector.DistributionAndOrderingSuiteBase.org$scalatest$BeforeAndAfter$$super$runTest(DistributionAndOrderingSuiteBase.scala:33)
[info]   at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:213)
[info]   at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:203)
[info]   at org.apache.spark.sql.connector.DistributionAndOrderingSuiteBase.runTest(DistributionAndOrderingSuiteBase.scala:33)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:333)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.sql.connector.DistributionAndOrderingSuiteBase.org$scalatest$BeforeAndAfter$$super$run(DistributionAndOrderingSuiteBase.scala:33)
[info]   at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:273)
[info]   at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:271)
[info]   at org.apache.spark.sql.connector.DistributionAndOrderingSuiteBase.run(DistributionAndOrderingSuiteBase.scala:33)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:750)
[info] Run completed in 20 seconds, 935 milliseconds.
[info] Total number of tests run: 32
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 30, failed 2, canceled 0, ignored 0, pending 0
[info] *** 2 TESTS FAILED ***
[error] Failed tests:
[error] 	org.apache.spark.sql.connector.KeyGroupedPartitioningSuite

run git revert 9e2aafb13739f9c07f8218cd325c5532063b1a51 to revert this pr and then executing the above commands, all test passed

[info] Run completed in 20 seconds, 857 milliseconds.
[info] Total number of tests run: 32
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 32, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Do you have time to fix this ? @sunchao

https://github.com/apache/spark/actions/runs/6088706713/job/16519965988

image

val sortedKeyToPartitions = results.sorted(partitionOrdering)
val groupedPartitions = sortedKeyToPartitions
.map(t => (InternalRowComparableWrapper(t._1, expressions), t._2))
.groupBy(_._1)
Copy link
Contributor

@LuciferYang LuciferYang Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem likely comes from this groupBy, as there are some differences between Scala 2.12 and Scala 2.13.

For example:

  • Scala 2.12.18
Welcome to Scala 2.12.18 (OpenJDK 64-Bit Server VM, Java 1.8.0_382).
Type in expressions for evaluation. Or try :help.

scala> val input = Seq((50,50),(51,51),(52,52))
input: Seq[(Int, Int)] = List((50,50), (51,51), (52,52))

scala> input.groupBy(_._1).toSeq
res0: Seq[(Int, Seq[(Int, Int)])] = Vector((50,List((50,50))), (51,List((51,51))), (52,List((52,52))))
  • Scala 2.13.8
Welcome to Scala 2.13.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_382).
Type in expressions for evaluation. Or try :help.

scala> val input = Seq((50,50),(51,51),(52,52))
val input: Seq[(Int, Int)] = List((50,50), (51,51), (52,52))

scala> input.groupBy(_._1).toSeq
val res0: Seq[(Int, Seq[(Int, Int)])] = List((52,List((52,52))), (50,List((50,50))), (51,List((51,51))))

We can see that when using Scala 2.13.8, the order of the results has changed.

The possible fix maybe:

  1. Using another function to replace groupBy to maintain the output order, such as foldLeft with LinkedHashMap ?
  2. Re-sorting the groupedPartitions ?

Perhaps there are other better ways to fix it?

Copy link
Member Author

@sunchao sunchao Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @LuciferYang for the findings! Yes it's a bug as I was assuming the order will be preserved in the groupBy. Let me open a follow-up PR to fix this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #42839 to fix this.

LuciferYang pushed a commit that referenced this pull request Sep 7, 2023
…ted according to partition values

### What changes were proposed in this pull request?

This PR makes sure the result grouped partitions from `DataSourceV2ScanExec#groupPartitions` are sorted according to the partition values. Previously in the #42757 we were assuming Scala would preserve the input ordering but apparently that's not the case.

### Why are the changes needed?

See #42757 (comment) for diagnosis. The partition ordering is a fundamental property for SPJ and thus must be guaranteed.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

We have tests in `KeyGroupedPartitioningSuite` to cover this.

### Was this patch authored or co-authored using generative AI tooling?

Closes #42839 from sunchao/SPARK-45036-followup.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
…red distribution

In SPJ, currently the logic to handle partially clustered distribution is a bit complicated. For instance, when the feature is eanbled (by enabling both `conf.v2BucketingPushPartValuesEnabled` and `conf.v2BucketingPartiallyClusteredDistributionEnabled`), Spark should postpone the combining of input splits until it is about to create an input RDD in `BatchScanExec`. To implement this, `groupPartitions` in `DataSourceV2ScanExecBase` currently takes the flag as input and has two different behaviors, which could be confusing.

This PR introduces a new field in `KeyGroupedPartitioning`, named `originalPartitionValues`, that is used to store the original partition values from input before splits combining  has been applied. The field is used when partially clustered distribution is enabled. With this, `groupPartitions` becomes easier to understand.

In addition, this also simplifies `BatchScanExec.inputRDD` by combining two branches where partially clustered distribution is not enabled.

To simplify the current logic in the SPJ w.r.t partially clustered distribution.

No

Existing tests.

Closes apache#42757 from sunchao/SPARK-45036.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
…ted according to partition values

### What changes were proposed in this pull request?

This PR makes sure the result grouped partitions from `DataSourceV2ScanExec#groupPartitions` are sorted according to the partition values. Previously in the apache#42757 we were assuming Scala would preserve the input ordering but apparently that's not the case.

### Why are the changes needed?

See apache#42757 (comment) for diagnosis. The partition ordering is a fundamental property for SPJ and thus must be guaranteed.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

We have tests in `KeyGroupedPartitioningSuite` to cover this.

### Was this patch authored or co-authored using generative AI tooling?

Closes apache#42839 from sunchao/SPARK-45036-followup.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…red distribution

In SPJ, currently the logic to handle partially clustered distribution is a bit complicated. For instance, when the feature is eanbled (by enabling both `conf.v2BucketingPushPartValuesEnabled` and `conf.v2BucketingPartiallyClusteredDistributionEnabled`), Spark should postpone the combining of input splits until it is about to create an input RDD in `BatchScanExec`. To implement this, `groupPartitions` in `DataSourceV2ScanExecBase` currently takes the flag as input and has two different behaviors, which could be confusing.

This PR introduces a new field in `KeyGroupedPartitioning`, named `originalPartitionValues`, that is used to store the original partition values from input before splits combining  has been applied. The field is used when partially clustered distribution is enabled. With this, `groupPartitions` becomes easier to understand.

In addition, this also simplifies `BatchScanExec.inputRDD` by combining two branches where partially clustered distribution is not enabled.

To simplify the current logic in the SPJ w.r.t partially clustered distribution.

No

Existing tests.

Closes apache#42757 from sunchao/SPARK-45036.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants