From 130bc957b6d8cf1b8b6c1cf77c57eb759fcc6aa6 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 6 Dec 2018 23:47:23 +0800 Subject: [PATCH 1/4] improve the doc of Distribution/Partitioning --- .../plans/physical/partitioning.scala | 48 +++++++++++-------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index cc1a5e835d9cd..c5a34d2082bc6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -22,13 +22,12 @@ import org.apache.spark.sql.types.{DataType, IntegerType} /** * Specifies how tuples that share common expressions will be distributed when a query is executed - * in parallel on many machines. Distribution can be used to refer to two distinct physical - * properties: - * - Inter-node partitioning of data: In this case the distribution describes how tuples are - * partitioned across physical machines in a cluster. Knowing this property allows some - * operators (e.g., Aggregate) to perform partition local operations instead of global ones. - * - Intra-partition ordering of data: In this case the distribution describes guarantees made - * about how tuples are distributed within a single partition. + * in parallel on many machines. + * + * Distribution here refers to inter-node partitioning of data: + * - The distribution describes how tuples are partitioned across physical machines in a cluster. + * Knowing this property allows some operators (e.g., Aggregate) to perform partition local + * operations instead of global ones. */ sealed trait Distribution { /** @@ -70,9 +69,7 @@ case object AllTuples extends Distribution { /** * Represents data where tuples that share the same values for the `clustering` - * [[Expression Expressions]] will be co-located. Based on the context, this - * can mean such tuples are either co-located in the same partition or they will be contiguous - * within a single partition. + * [[Expression Expressions]] will be co-located in the same partition. */ case class ClusteredDistribution( clustering: Seq[Expression], @@ -118,10 +115,12 @@ case class HashClusteredDistribution( /** * Represents data where tuples have been ordered according to the `ordering` - * [[Expression Expressions]]. This is a strictly stronger guarantee than - * [[ClusteredDistribution]] as an ordering will ensure that tuples that share the - * same value for the ordering expressions are contiguous and will never be split across - * partitions. + * [[Expression Expressions]]. Its requirement is defined as the following: + * - Given any 2 adjacent partitions, all the rows of the second partition must be larger than or + * equal to any row in the first partition, according to the `ordering` expressions. + * + * In other words, this distribution requires the rows to be ordered across partitions, but not + * necessarily within a partition. */ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { require( @@ -241,12 +240,12 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) /** * Represents a partitioning where rows are split across partitions based on some total ordering of - * the expressions specified in `ordering`. When data is partitioned in this manner the following - * two conditions are guaranteed to hold: - * - All row where the expressions in `ordering` evaluate to the same values will be in the same - * partition. - * - Each partition will have a `min` and `max` row, relative to the given ordering. All rows - * that are in between `min` and `max` in this `ordering` will reside in this partition. + * the expressions specified in `ordering`. When data is partitioned in this manner, it guarantees: + * - Given any 2 adjacent partitions, all the rows of the second partition must be larger than + * any row in the first partition, according to the `ordering` expressions. + * + * This is a strictly stronger guarantee than what `OrderedDistribution(ordering)` requires, as + * there is no overlap between partitions. * * This class extends expression primarily so that transformations over expression will descend * into its child. @@ -262,6 +261,15 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) super.satisfies0(required) || { required match { case OrderedDistribution(requiredOrdering) => + // If `ordering` is a prefix of `requiredOrdering`: + // - Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. If a row is + // larger than another row w.r.t. [a, b], it's also larger w.r.t. [a, b, c]. So + // `RangePartitioning(a, b)` satisfy `OrderedDistribution(a, b, c)`. + // + // If `requiredOrdering` is a prefix of `ordering`: + // - Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, b]. If a row is + // larger than another row w.r.t. [a, b, c], this row will not be smaller w.r.t. + // [a. b]. So `RangePartitioning(a, b, c)` satisfy `OrderedDistribution(a, b)`. val minSize = Seq(requiredOrdering.size, ordering.size).min requiredOrdering.take(minSize) == ordering.take(minSize) case ClusteredDistribution(requiredClustering, _) => From adfcec41adbffbef2e33fb85db5ad48eba5f3d71 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 10 Dec 2018 19:54:22 +0800 Subject: [PATCH 2/4] fix typo --- .../spark/sql/catalyst/plans/physical/partitioning.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index c5a34d2082bc6..63b29fa78e06d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -264,12 +264,12 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) // If `ordering` is a prefix of `requiredOrdering`: // - Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. If a row is // larger than another row w.r.t. [a, b], it's also larger w.r.t. [a, b, c]. So - // `RangePartitioning(a, b)` satisfy `OrderedDistribution(a, b, c)`. + // `RangePartitioning(a, b)` satisfies `OrderedDistribution(a, b, c)`. // // If `requiredOrdering` is a prefix of `ordering`: // - Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, b]. If a row is // larger than another row w.r.t. [a, b, c], this row will not be smaller w.r.t. - // [a. b]. So `RangePartitioning(a, b, c)` satisfy `OrderedDistribution(a, b)`. + // [a. b]. So `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a, b)`. val minSize = Seq(requiredOrdering.size, ordering.size).min requiredOrdering.take(minSize) == ordering.take(minSize) case ClusteredDistribution(requiredClustering, _) => From ddb82c3c8822e647790b1a303f647f0bf6dc1c9d Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 11 Dec 2018 09:20:18 +0800 Subject: [PATCH 3/4] code style --- .../sql/catalyst/plans/physical/partitioning.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 63b29fa78e06d..f0ed30b3efa7c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -24,10 +24,9 @@ import org.apache.spark.sql.types.{DataType, IntegerType} * Specifies how tuples that share common expressions will be distributed when a query is executed * in parallel on many machines. * - * Distribution here refers to inter-node partitioning of data: - * - The distribution describes how tuples are partitioned across physical machines in a cluster. - * Knowing this property allows some operators (e.g., Aggregate) to perform partition local - * operations instead of global ones. + * Distribution here refers to inter-node partitioning of data. That is, it describes how tuples + * are partitioned across physical machines in a cluster. Knowing this property allows some + * operators (e.g., Aggregate) to perform partition local operations instead of global ones. */ sealed trait Distribution { /** @@ -241,8 +240,8 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) /** * Represents a partitioning where rows are split across partitions based on some total ordering of * the expressions specified in `ordering`. When data is partitioned in this manner, it guarantees: - * - Given any 2 adjacent partitions, all the rows of the second partition must be larger than - * any row in the first partition, according to the `ordering` expressions. + * Given any 2 adjacent partitions, all the rows of the second partition must be larger than any row + * in the first partition, according to the `ordering` expressions. * * This is a strictly stronger guarantee than what `OrderedDistribution(ordering)` requires, as * there is no overlap between partitions. From cb94addf126f9885c73c1dc8ead26fbcfece4441 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 12 Dec 2018 13:25:08 +0800 Subject: [PATCH 4/4] improve comment --- .../plans/physical/partitioning.scala | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index f0ed30b3efa7c..17e1cb416fc8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -261,14 +261,21 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) required match { case OrderedDistribution(requiredOrdering) => // If `ordering` is a prefix of `requiredOrdering`: - // - Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. If a row is - // larger than another row w.r.t. [a, b], it's also larger w.r.t. [a, b, c]. So - // `RangePartitioning(a, b)` satisfies `OrderedDistribution(a, b, c)`. + // Let's say `ordering` is [a, b] and `requiredOrdering` is [a, b, c]. According to the + // RangePartitioning definition, any [a, b] in a previous partition must be smaller + // than any [a, b] in the following partition. This also means any [a, b, c] in a + // previous partition must be smaller than any [a, b, c] in the following partition. + // Thus `RangePartitioning(a, b)` satisfies `OrderedDistribution(a, b, c)`. // // If `requiredOrdering` is a prefix of `ordering`: - // - Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, b]. If a row is - // larger than another row w.r.t. [a, b, c], this row will not be smaller w.r.t. - // [a. b]. So `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a, b)`. + // Let's say `ordering` is [a, b, c] and `requiredOrdering` is [a, b]. According to the + // RangePartitioning definition, any [a, b, c] in a previous partition must be smaller + // than any [a, b, c] in the following partition. If there is a [a1, b1] from a previous + // partition which is larger than a [a2, b2] from the following partition, then there + // must be a [a1, b1 c1] larger than [a2, b2, c2], which violates RangePartitioning + // definition. So it's guaranteed that, any [a, b] in a previous partition must not be + // greater(i.e. smaller or equal to) than any [a, b] in the following partition. Thus + // `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a, b)`. val minSize = Seq(requiredOrdering.size, ordering.size).min requiredOrdering.take(minSize) == ordering.take(minSize) case ClusteredDistribution(requiredClustering, _) =>