Skip to content

Commit 0082515

Browse files
committed
Some additional comments + small cleanup to remove an unused parameter
1 parent a27cfc1 commit 0082515

File tree

2 files changed

+11
-4
lines changed

2 files changed

+11
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
117117

118118
@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf
119119

120-
private val rowDataTypes = child.output.map(_.dataType).toArray
121-
122120
private val serializer: Serializer = {
121+
val rowDataTypes = child.output.map(_.dataType).toArray
123122
// It is true when there is no field that needs to be write out.
124123
// For now, we will not use SparkSqlSerializer2 when noField is true.
125124
val noField = rowDataTypes == null || rowDataTypes.length == 0
@@ -181,7 +180,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
181180
}
182181
}
183182
}
184-
new ShuffledRowRDD(rowDataTypes, rddWithPartitionIds, serializer, part.numPartitions)
183+
new ShuffledRowRDD(rddWithPartitionIds, serializer, part.numPartitions)
185184
}
186185
}
187186

sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ private class ShuffledRowRDDPartition(val idx: Int) extends Partition {
2828
override def hashCode(): Int = idx
2929
}
3030

31+
/**
32+
* A dummy partitioner for use with records whose partition ids have been pre-computed (i.e. for
33+
* use on RDDs of (Int, Row) pairs where the Int is a partition id in the expected range).
34+
*/
3135
private class PartitionIdPassthrough(override val numPartitions: Int) extends Partitioner {
3236
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
3337
}
@@ -37,9 +41,13 @@ private class PartitionIdPassthrough(override val numPartitions: Int) extends Pa
3741
* shuffling rows instead of Java key-value pairs. Note that something like this should eventually
3842
* be implemented in Spark core, but that is blocked by some more general refactorings to shuffle
3943
* interfaces / internals.
44+
*
45+
* @param prev the RDD being shuffled. Elements of this RDD are (partitionId, Row) pairs.
46+
* Partition ids should be in the range [0, numPartitions - 1].
47+
* @param serializer the serializer used during the shuffle.
48+
* @param numPartitions the number of post-shuffle partitions.
4049
*/
4150
class ShuffledRowRDD(
42-
rowSchema: Array[DataType],
4351
@transient var prev: RDD[Product2[Int, InternalRow]],
4452
serializer: Serializer,
4553
numPartitions: Int)

0 commit comments

Comments
 (0)