Skip to content

Commit d74dee1

Browse files
rxingatorsmile
authored andcommitted
[SPARK-22153][SQL] Rename ShuffleExchange -> ShuffleExchangeExec
## What changes were proposed in this pull request? For some reason when we added the Exec suffix to all physical operators, we missed this one. I was looking for this physical operator today and couldn't find it, because I was looking for ExchangeExec. ## How was this patch tested? This is a simple rename and should be covered by existing tests. Author: Reynold Xin <[email protected]> Closes #19376 from rxin/SPARK-22153.
1 parent 01bd00d commit d74dee1

File tree

14 files changed

+95
-94
lines changed

14 files changed

+95
-94
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
2828
import org.apache.spark.sql.catalyst.plans.physical._
2929
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
3030
import org.apache.spark.sql.execution.command._
31-
import org.apache.spark.sql.execution.exchange.ShuffleExchange
31+
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
3232
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
3333
import org.apache.spark.sql.execution.streaming._
3434
import org.apache.spark.sql.internal.SQLConf
@@ -411,7 +411,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
411411

412412
case logical.Repartition(numPartitions, shuffle, child) =>
413413
if (shuffle) {
414-
ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
414+
ShuffleExchangeExec(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
415415
} else {
416416
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
417417
}
@@ -446,7 +446,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
446446
case r: logical.Range =>
447447
execution.RangeExec(r) :: Nil
448448
case logical.RepartitionByExpression(expressions, child, numPartitions) =>
449-
exchange.ShuffleExchange(HashPartitioning(
449+
exchange.ShuffleExchangeExec(HashPartitioning(
450450
expressions, numPartitions), planLater(child)) :: Nil
451451
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
452452
case r: LogicalRDD =>

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import org.apache.spark.sql.internal.SQLConf
2727
* Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]]
2828
* of input data meets the
2929
* [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for
30-
* each operator by inserting [[ShuffleExchange]] Operators where required. Also ensure that the
31-
* input partition ordering requirements are met.
30+
* each operator by inserting [[ShuffleExchangeExec]] Operators where required. Also ensure that
31+
* the input partition ordering requirements are met.
3232
*/
3333
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
3434
private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions
@@ -57,17 +57,17 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
5757
}
5858

5959
/**
60-
* Adds [[ExchangeCoordinator]] to [[ShuffleExchange]]s if adaptive query execution is enabled
61-
* and partitioning schemes of these [[ShuffleExchange]]s support [[ExchangeCoordinator]].
60+
* Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive query execution is enabled
61+
* and partitioning schemes of these [[ShuffleExchangeExec]]s support [[ExchangeCoordinator]].
6262
*/
6363
private def withExchangeCoordinator(
6464
children: Seq[SparkPlan],
6565
requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
6666
val supportsCoordinator =
67-
if (children.exists(_.isInstanceOf[ShuffleExchange])) {
67+
if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) {
6868
// Right now, ExchangeCoordinator only support HashPartitionings.
6969
children.forall {
70-
case e @ ShuffleExchange(hash: HashPartitioning, _, _) => true
70+
case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true
7171
case child =>
7272
child.outputPartitioning match {
7373
case hash: HashPartitioning => true
@@ -94,7 +94,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
9494
targetPostShuffleInputSize,
9595
minNumPostShufflePartitions)
9696
children.zip(requiredChildDistributions).map {
97-
case (e: ShuffleExchange, _) =>
97+
case (e: ShuffleExchangeExec, _) =>
9898
// This child is an Exchange, we need to add the coordinator.
9999
e.copy(coordinator = Some(coordinator))
100100
case (child, distribution) =>
@@ -138,7 +138,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
138138
val targetPartitioning =
139139
createPartitioning(distribution, defaultNumPreShufflePartitions)
140140
assert(targetPartitioning.isInstanceOf[HashPartitioning])
141-
ShuffleExchange(targetPartitioning, child, Some(coordinator))
141+
ShuffleExchangeExec(targetPartitioning, child, Some(coordinator))
142142
}
143143
} else {
144144
// If we do not need ExchangeCoordinator, the original children are returned.
@@ -162,7 +162,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
162162
case (child, BroadcastDistribution(mode)) =>
163163
BroadcastExchangeExec(mode, child)
164164
case (child, distribution) =>
165-
ShuffleExchange(createPartitioning(distribution, defaultNumPreShufflePartitions), child)
165+
ShuffleExchangeExec(createPartitioning(distribution, defaultNumPreShufflePartitions), child)
166166
}
167167

168168
// If the operator has multiple children and specifies child output distributions (e.g. join),
@@ -215,8 +215,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
215215
child match {
216216
// If child is an exchange, we replace it with
217217
// a new one having targetPartitioning.
218-
case ShuffleExchange(_, c, _) => ShuffleExchange(targetPartitioning, c)
219-
case _ => ShuffleExchange(targetPartitioning, child)
218+
case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(targetPartitioning, c)
219+
case _ => ShuffleExchangeExec(targetPartitioning, child)
220220
}
221221
}
222222
}
@@ -246,9 +246,9 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
246246
}
247247

248248
def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
249-
case operator @ ShuffleExchange(partitioning, child, _) =>
249+
case operator @ ShuffleExchangeExec(partitioning, child, _) =>
250250
child.children match {
251-
case ShuffleExchange(childPartitioning, baseChild, _)::Nil =>
251+
case ShuffleExchangeExec(childPartitioning, baseChild, _)::Nil =>
252252
if (childPartitioning.guarantees(partitioning)) child else operator
253253
case _ => operator
254254
}

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan}
3535
*
3636
* A coordinator is constructed with three parameters, `numExchanges`,
3737
* `targetPostShuffleInputSize`, and `minNumPostShufflePartitions`.
38-
* - `numExchanges` is used to indicated that how many [[ShuffleExchange]]s that will be registered
39-
* to this coordinator. So, when we start to do any actual work, we have a way to make sure that
40-
* we have got expected number of [[ShuffleExchange]]s.
38+
* - `numExchanges` is used to indicated that how many [[ShuffleExchangeExec]]s that will be
39+
* registered to this coordinator. So, when we start to do any actual work, we have a way to
40+
* make sure that we have got expected number of [[ShuffleExchangeExec]]s.
4141
* - `targetPostShuffleInputSize` is the targeted size of a post-shuffle partition's
4242
* input data size. With this parameter, we can estimate the number of post-shuffle partitions.
4343
* This parameter is configured through
@@ -47,28 +47,28 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan}
4747
* partitions.
4848
*
4949
* The workflow of this coordinator is described as follows:
50-
* - Before the execution of a [[SparkPlan]], for a [[ShuffleExchange]] operator,
50+
* - Before the execution of a [[SparkPlan]], for a [[ShuffleExchangeExec]] operator,
5151
* if an [[ExchangeCoordinator]] is assigned to it, it registers itself to this coordinator.
5252
* This happens in the `doPrepare` method.
53-
* - Once we start to execute a physical plan, a [[ShuffleExchange]] registered to this
53+
* - Once we start to execute a physical plan, a [[ShuffleExchangeExec]] registered to this
5454
* coordinator will call `postShuffleRDD` to get its corresponding post-shuffle
5555
* [[ShuffledRowRDD]].
56-
* If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchange]]
56+
* If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchangeExec]]
5757
* will immediately get its corresponding post-shuffle [[ShuffledRowRDD]].
5858
* - If this coordinator has not made the decision on how to shuffle data, it will ask those
59-
* registered [[ShuffleExchange]]s to submit their pre-shuffle stages. Then, based on the
59+
* registered [[ShuffleExchangeExec]]s to submit their pre-shuffle stages. Then, based on the
6060
* size statistics of pre-shuffle partitions, this coordinator will determine the number of
6161
* post-shuffle partitions and pack multiple pre-shuffle partitions with continuous indices
6262
* to a single post-shuffle partition whenever necessary.
6363
* - Finally, this coordinator will create post-shuffle [[ShuffledRowRDD]]s for all registered
64-
* [[ShuffleExchange]]s. So, when a [[ShuffleExchange]] calls `postShuffleRDD`, this coordinator
65-
* can lookup the corresponding [[RDD]].
64+
* [[ShuffleExchangeExec]]s. So, when a [[ShuffleExchangeExec]] calls `postShuffleRDD`, this
65+
* coordinator can lookup the corresponding [[RDD]].
6666
*
6767
* The strategy used to determine the number of post-shuffle partitions is described as follows.
6868
* To determine the number of post-shuffle partitions, we have a target input size for a
6969
* post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages
70-
* corresponding to the registered [[ShuffleExchange]]s, we will do a pass of those statistics and
71-
* pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until
70+
* corresponding to the registered [[ShuffleExchangeExec]]s, we will do a pass of those statistics
71+
* and pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until
7272
* adding another pre-shuffle partition would cause the size of a post-shuffle partition to be
7373
* greater than the target size.
7474
*
@@ -89,23 +89,23 @@ class ExchangeCoordinator(
8989
extends Logging {
9090

9191
// The registered Exchange operators.
92-
private[this] val exchanges = ArrayBuffer[ShuffleExchange]()
92+
private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]()
9393

9494
// This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator.
95-
private[this] val postShuffleRDDs: JMap[ShuffleExchange, ShuffledRowRDD] =
96-
new JHashMap[ShuffleExchange, ShuffledRowRDD](numExchanges)
95+
private[this] val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] =
96+
new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)
9797

9898
// A boolean that indicates if this coordinator has made decision on how to shuffle data.
9999
// This variable will only be updated by doEstimationIfNecessary, which is protected by
100100
// synchronized.
101101
@volatile private[this] var estimated: Boolean = false
102102

103103
/**
104-
* Registers a [[ShuffleExchange]] operator to this coordinator. This method is only allowed to
105-
* be called in the `doPrepare` method of a [[ShuffleExchange]] operator.
104+
* Registers a [[ShuffleExchangeExec]] operator to this coordinator. This method is only allowed
105+
* to be called in the `doPrepare` method of a [[ShuffleExchangeExec]] operator.
106106
*/
107107
@GuardedBy("this")
108-
def registerExchange(exchange: ShuffleExchange): Unit = synchronized {
108+
def registerExchange(exchange: ShuffleExchangeExec): Unit = synchronized {
109109
exchanges += exchange
110110
}
111111

@@ -200,7 +200,7 @@ class ExchangeCoordinator(
200200
// Make sure we have the expected number of registered Exchange operators.
201201
assert(exchanges.length == numExchanges)
202202

203-
val newPostShuffleRDDs = new JHashMap[ShuffleExchange, ShuffledRowRDD](numExchanges)
203+
val newPostShuffleRDDs = new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)
204204

205205
// Submit all map stages
206206
val shuffleDependencies = ArrayBuffer[ShuffleDependency[Int, InternalRow, InternalRow]]()
@@ -255,7 +255,7 @@ class ExchangeCoordinator(
255255
}
256256
}
257257

258-
def postShuffleRDD(exchange: ShuffleExchange): ShuffledRowRDD = {
258+
def postShuffleRDD(exchange: ShuffleExchangeExec): ShuffledRowRDD = {
259259
doEstimationIfNecessary()
260260

261261
if (!postShuffleRDDs.containsKey(exchange)) {

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala renamed to sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.util.MutablePair
3535
/**
3636
* Performs a shuffle that will result in the desired `newPartitioning`.
3737
*/
38-
case class ShuffleExchange(
38+
case class ShuffleExchangeExec(
3939
var newPartitioning: Partitioning,
4040
child: SparkPlan,
4141
@transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
@@ -84,7 +84,7 @@ case class ShuffleExchange(
8484
*/
8585
private[exchange] def prepareShuffleDependency()
8686
: ShuffleDependency[Int, InternalRow, InternalRow] = {
87-
ShuffleExchange.prepareShuffleDependency(
87+
ShuffleExchangeExec.prepareShuffleDependency(
8888
child.execute(), child.output, newPartitioning, serializer)
8989
}
9090

@@ -129,9 +129,9 @@ case class ShuffleExchange(
129129
}
130130
}
131131

132-
object ShuffleExchange {
133-
def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchange = {
134-
ShuffleExchange(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator])
132+
object ShuffleExchangeExec {
133+
def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchangeExec = {
134+
ShuffleExchangeExec(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator])
135135
}
136136

137137
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, LazilyGeneratedOrdering}
2525
import org.apache.spark.sql.catalyst.plans.physical._
26-
import org.apache.spark.sql.execution.exchange.ShuffleExchange
26+
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
2727
import org.apache.spark.util.Utils
2828

2929
/**
@@ -40,7 +40,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
4040
protected override def doExecute(): RDD[InternalRow] = {
4141
val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
4242
val shuffled = new ShuffledRowRDD(
43-
ShuffleExchange.prepareShuffleDependency(
43+
ShuffleExchangeExec.prepareShuffleDependency(
4444
locallyLimited, child.output, SinglePartition, serializer))
4545
shuffled.mapPartitionsInternal(_.take(limit))
4646
}
@@ -153,7 +153,7 @@ case class TakeOrderedAndProjectExec(
153153
}
154154
}
155155
val shuffled = new ShuffledRowRDD(
156-
ShuffleExchange.prepareShuffleDependency(
156+
ShuffleExchangeExec.prepareShuffleDependency(
157157
localTopK, child.output, SinglePartition, serializer))
158158
shuffled.mapPartitions { iter =>
159159
val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashPartitioning, SinglePartition}
2828
import org.apache.spark.sql.catalyst.rules.Rule
2929
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
30-
import org.apache.spark.sql.execution.exchange.ShuffleExchange
30+
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
3131
import org.apache.spark.sql.streaming.OutputMode
3232

3333
/**
@@ -155,7 +155,7 @@ object EnsureStatefulOpPartitioning extends Rule[SparkPlan] {
155155
child.execute().getNumPartitions == expectedPartitioning.numPartitions) {
156156
child
157157
} else {
158-
ShuffleExchange(expectedPartitioning, child)
158+
ShuffleExchangeExec(expectedPartitioning, child)
159159
}
160160
}
161161
so.withNewChildren(children)

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
2828
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
2929
import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
3030
import org.apache.spark.sql.execution.columnar._
31-
import org.apache.spark.sql.execution.exchange.ShuffleExchange
31+
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
3232
import org.apache.spark.sql.functions._
3333
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
3434
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
@@ -420,7 +420,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
420420
* Verifies that the plan for `df` contains `expected` number of Exchange operators.
421421
*/
422422
private def verifyNumExchanges(df: DataFrame, expected: Int): Unit = {
423-
assert(df.queryExecution.executedPlan.collect { case e: ShuffleExchange => e }.size == expected)
423+
assert(
424+
df.queryExecution.executedPlan.collect { case e: ShuffleExchangeExec => e }.size == expected)
424425
}
425426

426427
test("A cached table preserves the partitioning and ordering of its cached SparkPlan") {

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
3131
import org.apache.spark.sql.catalyst.plans.logical.{Filter, OneRowRelation, Union}
3232
import org.apache.spark.sql.execution.{FilterExec, QueryExecution}
3333
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
34-
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange}
34+
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
3535
import org.apache.spark.sql.functions._
3636
import org.apache.spark.sql.internal.SQLConf
3737
import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContext}
@@ -1529,7 +1529,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
15291529
fail("Should not have back to back Aggregates")
15301530
}
15311531
atFirstAgg = true
1532-
case e: ShuffleExchange => atFirstAgg = false
1532+
case e: ShuffleExchangeExec => atFirstAgg = false
15331533
case _ =>
15341534
}
15351535
}
@@ -1710,19 +1710,19 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
17101710
val plan = join.queryExecution.executedPlan
17111711
checkAnswer(join, df)
17121712
assert(
1713-
join.queryExecution.executedPlan.collect { case e: ShuffleExchange => true }.size === 1)
1713+
join.queryExecution.executedPlan.collect { case e: ShuffleExchangeExec => true }.size === 1)
17141714
assert(
17151715
join.queryExecution.executedPlan.collect { case e: ReusedExchangeExec => true }.size === 1)
17161716
val broadcasted = broadcast(join)
17171717
val join2 = join.join(broadcasted, "id").join(broadcasted, "id")
17181718
checkAnswer(join2, df)
17191719
assert(
1720-
join2.queryExecution.executedPlan.collect { case e: ShuffleExchange => true }.size === 1)
1720+
join2.queryExecution.executedPlan.collect { case e: ShuffleExchangeExec => true }.size == 1)
17211721
assert(
17221722
join2.queryExecution.executedPlan
17231723
.collect { case e: BroadcastExchangeExec => true }.size === 1)
17241724
assert(
1725-
join2.queryExecution.executedPlan.collect { case e: ReusedExchangeExec => true }.size === 4)
1725+
join2.queryExecution.executedPlan.collect { case e: ReusedExchangeExec => true }.size == 4)
17261726
}
17271727
}
17281728

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
2424
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
2525
import org.apache.spark.sql.catalyst.util.sideBySide
2626
import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec}
27-
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange}
27+
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
2828
import org.apache.spark.sql.execution.streaming.MemoryStream
2929
import org.apache.spark.sql.functions._
3030
import org.apache.spark.sql.internal.SQLConf
@@ -1206,7 +1206,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
12061206
val agg = cp.groupBy('id % 2).agg(count('id))
12071207

12081208
agg.queryExecution.executedPlan.collectFirst {
1209-
case ShuffleExchange(_, _: RDDScanExec, _) =>
1209+
case ShuffleExchangeExec(_, _: RDDScanExec, _) =>
12101210
case BroadcastExchangeExec(_, _: RDDScanExec) =>
12111211
}.foreach { _ =>
12121212
fail(

0 commit comments

Comments
 (0)