@@ -29,7 +29,10 @@ import org.apache.spark.sql.catalyst.rules.Rule
2929import org .apache .spark .util .MutablePair
3030
3131object Exchange {
32- /** Returns true when the ordering expressions are a subset of the key. */
32+ /**
33+ * Returns true when the ordering expressions are a subset of the key.
34+ * if true, ShuffledRDD can use `setKeyOrdering(orderingKey)` to sort within [[Exchange ]].
35+ */
3336 def canSortWithShuffle (partitioning : Partitioning , desiredOrdering : Seq [SortOrder ]): Boolean = {
3437 desiredOrdering.map(_.child).toSet.subsetOf(partitioning.keyExpressions.toSet)
3538 }
@@ -224,7 +227,11 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
224227 }
225228
226229 val withSort = if (needSort) {
227- Sort (rowOrdering, global = false , withShuffle)
230+ if (sqlContext.conf.externalSortEnabled) {
231+ ExternalSort (rowOrdering, global = false , withShuffle)
232+ } else {
233+ Sort (rowOrdering, global = false , withShuffle)
234+ }
228235 } else {
229236 withShuffle
230237 }
@@ -253,7 +260,11 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
253260 case (UnspecifiedDistribution , Seq (), child) =>
254261 child
255262 case (UnspecifiedDistribution , rowOrdering, child) =>
256- Sort (rowOrdering, global = false , child)
263+ if (sqlContext.conf.externalSortEnabled) {
264+ ExternalSort (rowOrdering, global = false , child)
265+ } else {
266+ Sort (rowOrdering, global = false , child)
267+ }
257268
258269 case (dist, ordering, _) =>
259270 sys.error(s " Don't know how to ensure $dist with ordering $ordering" )
0 commit comments