Skip to content

Commit c0b3e45

Browse files
rednaxelafxgatorsmile
authored andcommitted
[SPARK-20872][SQL] ShuffleExchange.nodeName should handle null coordinator
## What changes were proposed in this pull request? A one-liner change in `ShuffleExchange.nodeName` to cover the case when `coordinator` is `null`, so that the match expression is exhaustive. Please refer to [SPARK-20872](https://issues.apache.org/jira/browse/SPARK-20872) for a description of the symptoms. TL;DR is that inspecting a `ShuffleExchange` (directly or transitively) on the Executor side can hit a case where the `coordinator` field of a `ShuffleExchange` is null, and thus will trigger a `MatchError` in `ShuffleExchange.nodeName()`'s inexhaustive match expression. Also changed two other match conditions in `ShuffleExchange` on the `coordinator` field to be consistent. ## How was this patch tested? Manually tested this change with a case where the `coordinator` is null to make sure `ShuffleExchange.nodeName` doesn't throw a `MatchError` any more. Author: Kris Mok <[email protected]> Closes #18095 from rednaxelafx/shuffleexchange-nodename.
1 parent 95aef66 commit c0b3e45

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,17 @@ case class ShuffleExchange(
4040
child: SparkPlan,
4141
@transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
4242

43+
// NOTE: coordinator can be null after serialization/deserialization,
44+
// e.g. it can be null on the Executor side
45+
4346
override lazy val metrics = Map(
4447
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
4548

4649
override def nodeName: String = {
4750
val extraInfo = coordinator match {
4851
case Some(exchangeCoordinator) =>
4952
s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})"
50-
case None => ""
53+
case _ => ""
5154
}
5255

5356
val simpleNodeName = "Exchange"
@@ -70,7 +73,7 @@ case class ShuffleExchange(
7073
// the plan.
7174
coordinator match {
7275
case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this)
73-
case None =>
76+
case _ =>
7477
}
7578
}
7679

@@ -117,7 +120,7 @@ case class ShuffleExchange(
117120
val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
118121
assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
119122
shuffleRDD
120-
case None =>
123+
case _ =>
121124
val shuffleDependency = prepareShuffleDependency()
122125
preparePostShuffleRDD(shuffleDependency)
123126
}

0 commit comments

Comments
 (0)