Skip to content

Commit 0983f75

Browse files
committed
More guarantees vs. compatibleWith cleanup; delete BroadcastPartitioning.
1 parent 8784bd9 commit 0983f75

File tree

2 files changed

+43
-32
lines changed

2 files changed

+43
-32
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
7676
}
7777

7878
/**
79-
* Describes how an operator's output is split across partitions. The `satisfies`,
80-
* `compatibleWith`, and `guarantees` methods describe relationships between child partitionings,
79+
* Describes how an operator's output is split across partitions. The `compatibleWith`,
80+
* `guarantees`, and `satisfies` methods describe relationships between child partitionings,
8181
* target partitionings, and [[Distribution]]s. These relations are described more precisely in
8282
* their individual method docs, but at a high level:
8383
*
@@ -118,6 +118,23 @@ sealed trait Partitioning {
118118
*/
119119
def satisfies(required: Distribution): Boolean
120120

121+
/**
122+
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]]
123+
* guarantees the same partitioning scheme described by `other`.
124+
*
125+
* Compatibility of partitionings is only checked for operators that have multiple children
126+
* and that require a specific child output [[Distribution]], such as joins.
127+
*
128+
* Intuitively, partitionings are compatible if they route the same partitioning key to the same
129+
* partition. For instance, two hash partitionings are only compatible if they produce the same
130+
* number of output partitionings and hash records according to the same hash function and
131+
* same partitioning key schema.
132+
*
133+
* Put another way, two partitionings are compatible with each other if they satisfy all of the
134+
* same distribution guarantees.
135+
*/
136+
def compatibleWith(other: Partitioning): Boolean
137+
121138
/**
122139
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]] guarantees
123140
* the same partitioning scheme described by `other`. If a `A.guarantees(B)`, then repartitioning
@@ -148,20 +165,6 @@ sealed trait Partitioning {
148165
* produced by `A` could have also been produced by `B`.
149166
*/
150167
def guarantees(other: Partitioning): Boolean = this == other
151-
152-
/**
153-
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]]
154-
* guarantees the same partitioning scheme described by `other`.
155-
*
156-
* Compatibility of partitionings is only checked for operators that have multiple children
157-
* and that require a specific child output [[Distribution]], such as joins.
158-
*
159-
* Intuitively, partitionings are compatible if they route the same partitioning key to the same
160-
* partition. For instance, two hash partitionings are only compatible if they produce the same
161-
* number of output partitionings and hash records according to the same hash function and
162-
* same partitioning key schema.
163-
*/
164-
def compatibleWith(other: Partitioning): Boolean
165168
}
166169

167170
object Partitioning {
@@ -187,28 +190,18 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
187190
}
188191

189192
override def compatibleWith(other: Partitioning): Boolean = false
193+
194+
override def guarantees(other: Partitioning): Boolean = false
190195
}
191196

192197
case object SinglePartition extends Partitioning {
193198
val numPartitions = 1
194199

195200
override def satisfies(required: Distribution): Boolean = true
196201

197-
override def compatibleWith(other: Partitioning): Boolean = other match {
198-
case SinglePartition => true
199-
case _ => false
200-
}
201-
}
202-
203-
case object BroadcastPartitioning extends Partitioning {
204-
val numPartitions = 1
205-
206-
override def satisfies(required: Distribution): Boolean = true
202+
override def compatibleWith(other: Partitioning): Boolean = other.numPartitions == 1
207203

208-
override def compatibleWith(other: Partitioning): Boolean = other match {
209-
case BroadcastPartitioning => true
210-
case _ => false
211-
}
204+
override def guarantees(other: Partitioning): Boolean = other.numPartitions == 1
212205
}
213206

214207
/**
@@ -237,6 +230,12 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
237230
this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions
238231
case _ => false
239232
}
233+
234+
override def guarantees(other: Partitioning): Boolean = other match {
235+
case o: HashPartitioning =>
236+
this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions
237+
case _ => false
238+
}
240239
}
241240

242241
/**
@@ -274,6 +273,11 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
274273
case o: RangePartitioning => this == o
275274
case _ => false
276275
}
276+
277+
override def guarantees(other: Partitioning): Boolean = other match {
278+
case o: RangePartitioning => this == o
279+
case _ => false
280+
}
277281
}
278282

279283
/**
@@ -314,12 +318,19 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
314318
partitionings.exists(_.satisfies(required))
315319

316320
/**
317-
* Returns true if any `partitioning` of this collection guarantees
321+
* Returns true if any `partitioning` of this collection is compatible with
318322
* the given [[Partitioning]].
319323
*/
320324
override def compatibleWith(other: Partitioning): Boolean =
321325
partitionings.exists(_.compatibleWith(other))
322326

327+
/**
328+
* Returns true if any `partitioning` of this collection guarantees
329+
* the given [[Partitioning]].
330+
*/
331+
override def guarantees(other: Partitioning): Boolean =
332+
partitionings.exists(_.guarantees(other))
333+
323334
override def toString: String = {
324335
partitionings.map(_.toString).mkString("(", " or ", ")")
325336
}

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
230230
val newChildren = operator.children.zip(operator.requiredChildDistribution).map {
231231
case (child, requiredDistribution) =>
232232
val targetPartitioning = canonicalPartitioning(requiredDistribution)
233-
if (child.outputPartitioning.compatibleWith(targetPartitioning)) {
233+
if (child.outputPartitioning.guarantees(targetPartitioning)) {
234234
child
235235
} else {
236236
Exchange(targetPartitioning, child)

0 commit comments

Comments
 (0)