Skip to content

Commit 8784bd9

Browse files
committed
Giant comment explaining compatibleWith vs. guarantees
1 parent 1307c50 commit 8784bd9

File tree

3 files changed

+83
-14
lines changed

3 files changed

+83
-14
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,37 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
7575
def clustering: Set[Expression] = ordering.map(_.child).toSet
7676
}
7777

78+
/**
79+
* Describes how an operator's output is split across partitions. The `satisfies`,
80+
* `compatibleWith`, and `guarantees` methods describe relationships between child partitionings,
81+
* target partitionings, and [[Distribution]]s. These relations are described more precisely in
82+
* their individual method docs, but at a high level:
83+
*
84+
* - `satisfies` is a relationship between partitionings and distributions.
85+
* - `compatibleWith` is relationships between an operator's child output partitionings.
86+
* - `guarantees` is a relationship between a child's existing output partitioning and a target
87+
* output partitioning.
88+
*
89+
* Diagrammatically:
90+
*
91+
* +--------------+
92+
* | Distribution |
93+
* +--------------+
94+
* ^
95+
* |
96+
* satisfies
97+
* |
98+
* +--------------+ +--------------+
99+
* | Child | | Target |
100+
* +----| Partitioning |----guarantees--->| Partitioning |
101+
* | +--------------+ +--------------+
102+
* | ^
103+
* | |
104+
* | compatibleWith
105+
* | |
106+
* +------------+
107+
*
108+
*/
78109
sealed trait Partitioning {
79110
/** Returns the number of partitions that the data is split across */
80111
val numPartitions: Int
@@ -87,12 +118,50 @@ sealed trait Partitioning {
87118
*/
88119
def satisfies(required: Distribution): Boolean
89120

121+
/**
122+
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]] guarantees
123+
* the same partitioning scheme described by `other`. If a `A.guarantees(B)`, then repartitioning
124+
* the child's output according to `B` will be unnecessary. `guarantees` is used as a performance
125+
* optimization to allow the exchange planner to avoid redundant repartitionings. By default,
126+
* a partitioning only guarantees partitionings that are equal to itself (i.e. the same number
127+
* of partitions, same strategy (range or hash), etc).
128+
*
129+
* In order to enable more aggressive optimization, this strict equality check can be relaxed.
130+
* For example, say that the planner needs to repartition all of an operator's children so that
131+
* they satisfy the [[AllTuples]] distribution. One way to do this is to repartition all children
132+
* to have the [[SinglePartition]] partitioning. If one of the operator's children already happens
133+
* to be hash-partitioned with a single partition then we do not need to re-shuffle this child;
134+
* this repartitioning can be avoided if a single-partition [[HashPartitioning]] `guarantees`
135+
* [[SinglePartition]].
136+
*
137+
* The SinglePartition example given above is not particularly interesting; guarantees' real
138+
* value occurs for more advanced partitioning strategies. SPARK-7871 will introduce a notion
139+
* of null-safe partitionings, under which partitionings can specify whether rows whose
140+
* partitioning keys contain null values will be grouped into the same partition or whether they
141+
* will have an unknown / random distribution. If a partitioning does not require nulls to be
142+
* clustered then a partitioning which _does_ cluster nulls will guarantee the null clustered
143+
* partitioning. The converse is not true, however: a partitioning which clusters nulls cannot
144+
* be guaranteed by one which does not cluster them. Thus, in general `guarantees` is not a
145+
* symmetric relation.
146+
*
147+
* Another way to think about `guarantees`: if `A.guarantees(B)`, then any partitioning of rows
148+
* produced by `A` could have also been produced by `B`.
149+
*/
150+
def guarantees(other: Partitioning): Boolean = this == other
151+
90152
/**
91153
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]]
92154
* guarantees the same partitioning scheme described by `other`.
155+
*
156+
* Compatibility of partitionings is only checked for operators that have multiple children
157+
* and that require a specific child output [[Distribution]], such as joins.
158+
*
159+
* Intuitively, partitionings are compatible if they route the same partitioning key to the same
160+
* partition. For instance, two hash partitionings are only compatible if they produce the same
161+
* number of output partitionings and hash records according to the same hash function and
162+
* same partitioning key schema.
93163
*/
94-
// TODO: Add an example once we have the `nullSafe` concept.
95-
def guarantees(other: Partitioning): Boolean
164+
def compatibleWith(other: Partitioning): Boolean
96165
}
97166

98167
object Partitioning {
@@ -102,10 +171,10 @@ object Partitioning {
102171
case Seq(a) => true
103172
case Seq(a, b) =>
104173
if (a.numPartitions != b.numPartitions) {
105-
assert(!a.guarantees(b) && !b.guarantees(a))
174+
assert(!a.compatibleWith(b) && !b.compatibleWith(a))
106175
false
107176
} else {
108-
a.guarantees(b) && b.guarantees(a)
177+
a.compatibleWith(b) && b.compatibleWith(a)
109178
}
110179
}.forall(_ == true)
111180
}
@@ -117,15 +186,15 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
117186
case _ => false
118187
}
119188

120-
override def guarantees(other: Partitioning): Boolean = false
189+
override def compatibleWith(other: Partitioning): Boolean = false
121190
}
122191

123192
case object SinglePartition extends Partitioning {
124193
val numPartitions = 1
125194

126195
override def satisfies(required: Distribution): Boolean = true
127196

128-
override def guarantees(other: Partitioning): Boolean = other match {
197+
override def compatibleWith(other: Partitioning): Boolean = other match {
129198
case SinglePartition => true
130199
case _ => false
131200
}
@@ -136,7 +205,7 @@ case object BroadcastPartitioning extends Partitioning {
136205

137206
override def satisfies(required: Distribution): Boolean = true
138207

139-
override def guarantees(other: Partitioning): Boolean = other match {
208+
override def compatibleWith(other: Partitioning): Boolean = other match {
140209
case BroadcastPartitioning => true
141210
case _ => false
142211
}
@@ -163,7 +232,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
163232
case _ => false
164233
}
165234

166-
override def guarantees(other: Partitioning): Boolean = other match {
235+
override def compatibleWith(other: Partitioning): Boolean = other match {
167236
case o: HashPartitioning =>
168237
this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions
169238
case _ => false
@@ -201,7 +270,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
201270
case _ => false
202271
}
203272

204-
override def guarantees(other: Partitioning): Boolean = other match {
273+
override def compatibleWith(other: Partitioning): Boolean = other match {
205274
case o: RangePartitioning => this == o
206275
case _ => false
207276
}
@@ -248,8 +317,8 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
248317
* Returns true if any `partitioning` of this collection guarantees
249318
* the given [[Partitioning]].
250319
*/
251-
override def guarantees(other: Partitioning): Boolean =
252-
partitionings.exists(_.guarantees(other))
320+
override def compatibleWith(other: Partitioning): Boolean =
321+
partitionings.exists(_.compatibleWith(other))
253322

254323
override def toString: String = {
255324
partitionings.map(_.toString).mkString("(", " or ", ")")

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
230230
val newChildren = operator.children.zip(operator.requiredChildDistribution).map {
231231
case (child, requiredDistribution) =>
232232
val targetPartitioning = canonicalPartitioning(requiredDistribution)
233-
if (child.outputPartitioning.guarantees(targetPartitioning)) {
233+
if (child.outputPartitioning.compatibleWith(targetPartitioning)) {
234234
child
235235
} else {
236236
Exchange(targetPartitioning, child)

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,8 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
242242
assert(rightPartitioning.satisfies(distribution))
243243
// However, these partitionings are not compatible with each other, so we still need to
244244
// repartition both inputs prior to performing the join:
245-
assert(!leftPartitioning.guarantees(rightPartitioning))
246-
assert(!rightPartitioning.guarantees(leftPartitioning))
245+
assert(!leftPartitioning.compatibleWith(rightPartitioning))
246+
assert(!rightPartitioning.compatibleWith(leftPartitioning))
247247
val inputPlan = DummySparkPlan(
248248
children = Seq(
249249
DummySparkPlan(outputPartitioning = leftPartitioning),

0 commit comments

Comments
 (0)