From 473d814b9a0c7d353726458e2d2984a23f4714ac Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 14 Jul 2015 18:50:12 -0700 Subject: [PATCH 1/5] [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering everywhere This patch renames RowOrdering to InterpretedOrdering and updates a few operators to use SparkPlan.newOrdering instead of manually constructing a RowOrdering. --- .../apache/spark/sql/catalyst/expressions/rows.scala | 9 +-------- .../org/apache/spark/sql/execution/Exchange.scala | 10 +++++----- .../org/apache/spark/sql/execution/SparkPlan.scala | 4 ++-- .../scala/org/apache/spark/sql/execution/Window.scala | 4 +--- .../apache/spark/sql/execution/basicOperators.scala | 2 +- .../spark/sql/execution/joins/SortMergeJoin.scala | 5 +++-- 6 files changed, 13 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala index 094904bbf9c15..b3975c3af2c39 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala @@ -153,7 +153,7 @@ class GenericMutableRow(val values: Array[Any]) extends MutableRow with ArrayBac override def copy(): InternalRow = new GenericInternalRow(values.clone()) } -class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] { +class InterpretedOrdering private (ordering: Seq[SortOrder]) extends Ordering[InternalRow] { def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = this(ordering.map(BindReferences.bindReference(_, inputSchema))) @@ -185,10 +185,3 @@ class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] { return 0 } } - -object RowOrdering { - def forSchema(dataTypes: Seq[DataType]): RowOrdering = - new RowOrdering(dataTypes.zipWithIndex.map { - case(dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending) - }) -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 4b783e30d95e1..179f61239e6fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -158,16 +158,16 @@ case class Exchange( val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { child.execute().mapPartitions { iter => val hashExpressions = newMutableProjection(expressions, child.output)() - iter.map(r => (hashExpressions(r).copy(), r.copy())) + iter.map(r => (hashExpressions(r).hashCode, r.copy())) } } else { child.execute().mapPartitions { iter => val hashExpressions = newMutableProjection(expressions, child.output)() - val mutablePair = new MutablePair[InternalRow, InternalRow]() - iter.map(r => mutablePair.update(hashExpressions(r), r)) + val mutablePair = new MutablePair[Int, InternalRow]() + iter.map(r => mutablePair.update(hashExpressions(r).hashCode, r)) } } - val shuffled = new ShuffledRDD[InternalRow, InternalRow, InternalRow](rdd, part) + val shuffled = new ShuffledRDD[Int, InternalRow, InternalRow](rdd, part) shuffled.setSerializer(serializer) shuffled.map(_._2) @@ -184,7 +184,7 @@ case class Exchange( iter.map(row => mutablePair.update(row.copy(), null)) } // TODO: RangePartitioner should take an Ordering. - implicit val ordering = new RowOrdering(sortingExpressions, child.output) + implicit val ordering = newOrdering(sortingExpressions, child.output) new RangePartitioner(numPartitions, rddForSampling, ascending = true) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 4d7d8626a0ecc..6584a7ec4eb86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -229,11 +229,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ throw e } else { log.error("Failed to generate ordering, fallback to interpreted", e) - new RowOrdering(order, inputSchema) + new InterpretedOrdering(order, inputSchema) } } } else { - new RowOrdering(order, inputSchema) + new InterpretedOrdering(order, inputSchema) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 6e127e548a120..4d5356a48557d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -119,10 +119,8 @@ case class Window( // Although input rows are grouped based on windowSpec.partitionSpec, we need to // know when we have a new partition. // This is to manually construct an ordering that can be used to compare rows. - // TODO: We may want to have a newOrdering that takes BoundReferences. - // So, we can take advantave of code gen. private val partitionOrdering: Ordering[InternalRow] = - RowOrdering.forSchema(windowSpec.partitionSpec.map(_.dataType)) + newOrdering(windowSpec.partitionSpec.map(SortOrder(_, Ascending)), child.output) // This is used to project expressions for the partition specification. protected val partitionGenerator = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 4c063c299ba53..48e2e57012bb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -167,7 +167,7 @@ case class TakeOrderedAndProject( override def outputPartitioning: Partitioning = SinglePartition - private val ord: RowOrdering = new RowOrdering(sortOrder, child.output) + private val ord: Ordering[InternalRow] = newOrdering(sortOrder, child.output) // TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable. @transient private val projection = projectList.map(new InterpretedProjection(_, child.output)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala index 981447eacad74..a44198f7e5a27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala @@ -45,8 +45,9 @@ case class SortMergeJoin( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - // this is to manually construct an ordering that can be used to compare keys from both sides - private val keyOrdering: RowOrdering = RowOrdering.forSchema(leftKeys.map(_.dataType)) + // Construct an ordering that can be used to compare keys from both sides + private val keyOrdering: Ordering[InternalRow] = + newOrdering(requiredOrders(leftKeys), left.output) override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys) From 9d9b092f01c573b73260c6acf69c3190d04f2c8c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 14 Jul 2015 18:53:01 -0700 Subject: [PATCH 2/5] Back out unrelated Exchange changes, which are slated for a separate patch --- .../scala/org/apache/spark/sql/execution/Exchange.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 179f61239e6fb..b52dbd352618d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -158,16 +158,16 @@ case class Exchange( val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { child.execute().mapPartitions { iter => val hashExpressions = newMutableProjection(expressions, child.output)() - iter.map(r => (hashExpressions(r).hashCode, r.copy())) + iter.map(r => (hashExpressions(r).copy, r.copy())) } } else { child.execute().mapPartitions { iter => val hashExpressions = newMutableProjection(expressions, child.output)() - val mutablePair = new MutablePair[Int, InternalRow]() - iter.map(r => mutablePair.update(hashExpressions(r).hashCode, r)) + val mutablePair = new MutablePair[InternalRow, InternalRow]() + iter.map(r => mutablePair.update(hashExpressions(r), r)) } } - val shuffled = new ShuffledRDD[Int, InternalRow, InternalRow](rdd, part) + val shuffled = new ShuffledRDD[InternalRow, InternalRow, InternalRow](rdd, part) shuffled.setSerializer(serializer) shuffled.map(_._2) From 5eb7222a2a8e4654064bc43facdadefc7e527281 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 14 Jul 2015 18:56:27 -0700 Subject: [PATCH 3/5] Back out unintentional change. --- .../main/scala/org/apache/spark/sql/execution/Exchange.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index b52dbd352618d..b8b1b86b7229d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -158,7 +158,7 @@ case class Exchange( val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { child.execute().mapPartitions { iter => val hashExpressions = newMutableProjection(expressions, child.output)() - iter.map(r => (hashExpressions(r).copy, r.copy())) + iter.map(r => (hashExpressions(r).copy(), r.copy())) } } else { child.execute().mapPartitions { iter => From 9e2abbb0bd0d2896139fc90d74b05e296ea91008 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 14 Jul 2015 21:25:52 -0700 Subject: [PATCH 4/5] Add wrapper to allow RangePartitioner's Ordering to be serialized --- .../apache/spark/sql/execution/Exchange.scala | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 003cf8452d8ea..9ea5b98949999 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.DataType -import org.apache.spark.util.MutablePair +import org.apache.spark.util.{Utils, MutablePair} import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv} /** @@ -175,8 +175,21 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una val mutablePair = new MutablePair[InternalRow, Null]() iter.map(row => mutablePair.update(row.copy(), null)) } - // TODO: RangePartitioner should take an Ordering. - implicit val ordering = newOrdering(sortingExpressions, child.output) + // This wrapper works around the fact that generated orderings are not Serializable. + // Normally we do not run into this problem because the code generation is performed on + // the executors, but Spark's RangePartitioner requires a Serializable Ordering to be + // created on the driver. This wrapper is an easy workaround to let us use generated + // orderings here without having to rewrite or modify RangePartitioner. + implicit val ordering = new Ordering[InternalRow] { + @transient var _ordering = buildOrdering() + override def compare(x: InternalRow, y: InternalRow): Int = _ordering.compare(x, y) + def buildOrdering(): Ordering[InternalRow] = + newOrdering(sortingExpressions, child.output) + private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException { + in.defaultReadObject() + _ordering = buildOrdering() + } + } new RangePartitioner(numPartitions, rddForSampling, ascending = true) } From 15b7d92a47e0364fd71b5153dceadacdade494ed Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 17 Jul 2015 14:49:58 -0700 Subject: [PATCH 5/5] Add wrapper to fix issue in TakeOrderedAndProject --- .../spark/sql/execution/basicOperators.scala | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 48e2e57012bb7..5e2ec140ba4f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.util.collection.ExternalSorter import org.apache.spark.util.collection.unsafe.sort.PrefixComparator -import org.apache.spark.util.{CompletionIterator, MutablePair} +import org.apache.spark.util.{CompletionIterator, MutablePair, Utils} import org.apache.spark.{HashPartitioner, SparkEnv} /** @@ -167,7 +167,24 @@ case class TakeOrderedAndProject( override def outputPartitioning: Partitioning = SinglePartition - private val ord: Ordering[InternalRow] = newOrdering(sortOrder, child.output) + private val ord: Ordering[InternalRow] = { + // This wrapper works around the fact that generated orderings are not Serializable. + // Normally we do not run into this problem because the code generation is performed on + // the executors, but Spark's takeOrdered requires a Serializable Ordering to be + // created on the driver. This wrapper is an easy workaround to let us use generated + // orderings here without having to rewrite or modify takeOrdered. + val schema = child.output + val sortOrderCopy = sortOrder + new Ordering[InternalRow] { + @transient var _ordering = buildOrdering() + override def compare(x: InternalRow, y: InternalRow): Int = _ordering.compare(x, y) + def buildOrdering(): Ordering[InternalRow] = newOrdering(sortOrderCopy, schema) + private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException { + in.defaultReadObject() + _ordering = buildOrdering() + } + } + } // TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable. @transient private val projection = projectList.map(new InterpretedProjection(_, child.output))