From cbb2c597fafff78c9207b733197f6391421bf591 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 13 Dec 2015 17:37:33 -0800 Subject: [PATCH 1/3] Support UnsafeRow in Coalesce/Except/Intersect. --- .../spark/sql/execution/basicOperators.scala | 10 +++++ .../execution/RowFormatConvertersSuite.scala | 42 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index a42aea0b96d43..996523a265f0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -250,7 +250,9 @@ case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode { child.execute().coalesce(numPartitions, shuffle = false) } + override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows override def canProcessUnsafeRows: Boolean = true + override def canProcessSafeRows: Boolean = true } /** @@ -263,6 +265,10 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { protected override def doExecute(): RDD[InternalRow] = { left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) } + + override def outputsUnsafeRows: Boolean = children.forall(_.outputsUnsafeRows) + override def canProcessUnsafeRows: Boolean = true + override def canProcessSafeRows: Boolean = true } /** @@ -275,6 +281,10 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { protected override def doExecute(): RDD[InternalRow] = { left.execute().map(_.copy()).intersection(right.execute().map(_.copy())) } + + override def outputsUnsafeRows: Boolean = children.forall(_.outputsUnsafeRows) + override def canProcessUnsafeRows: Boolean = true + override def canProcessSafeRows: Boolean = true } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala index 13d68a103a225..4b6d2d51ff926 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala @@ -58,6 +58,48 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext { assert(!preparedPlan.outputsUnsafeRows) } + test("coalesce can process unsafe rows") { + val plan = Coalesce(1, outputsUnsafe) + val preparedPlan = sqlContext.prepareForExecution.execute(plan) + assert(getConverters(preparedPlan).size === 1) + assert(preparedPlan.outputsUnsafeRows) + } + + test("coalesce can process safe rows") { + val plan = Coalesce(1, outputsSafe) + val preparedPlan = sqlContext.prepareForExecution.execute(plan) + assert(getConverters(preparedPlan).isEmpty) + assert(!preparedPlan.outputsUnsafeRows) + } + + test("except can process unsafe rows") { + val plan = Except(outputsUnsafe, outputsUnsafe) + val preparedPlan = sqlContext.prepareForExecution.execute(plan) + assert(getConverters(preparedPlan).size === 2) + assert(preparedPlan.outputsUnsafeRows) + } + + test("except can process safe rows") { + val plan = Except(outputsSafe, outputsSafe) + val preparedPlan = sqlContext.prepareForExecution.execute(plan) + assert(getConverters(preparedPlan).isEmpty) + assert(!preparedPlan.outputsUnsafeRows) + } + + test("intersect can process unsafe rows") { + val plan = Intersect(outputsUnsafe, outputsUnsafe) + val preparedPlan = sqlContext.prepareForExecution.execute(plan) + assert(getConverters(preparedPlan).size === 2) + assert(preparedPlan.outputsUnsafeRows) + } + + test("intersect can process safe rows") { + val plan = Intersect(outputsSafe, outputsSafe) + val preparedPlan = sqlContext.prepareForExecution.execute(plan) + assert(getConverters(preparedPlan).isEmpty) + assert(!preparedPlan.outputsUnsafeRows) + } + test("execute() fails an assertion if inputs rows are of different formats") { val e = intercept[AssertionError] { Union(Seq(outputsSafe, outputsUnsafe)).execute() From b17ba61a4e6873ed93d6cb9115b58ee78581726a Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 14 Dec 2015 11:17:21 -0800 Subject: [PATCH 2/3] Support UnsafeRow in Coalesce/Except/Intersect. --- .../spark/sql/execution/basicOperators.scala | 6 +++--- .../sql/execution/RowFormatConvertersSuite.scala | 14 ++++++++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 996523a265f0a..b3e4688557ba0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -137,7 +137,7 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan { } } } - override def outputsUnsafeRows: Boolean = children.forall(_.outputsUnsafeRows) + override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows) override def canProcessUnsafeRows: Boolean = true override def canProcessSafeRows: Boolean = true protected override def doExecute(): RDD[InternalRow] = @@ -266,7 +266,7 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) } - override def outputsUnsafeRows: Boolean = children.forall(_.outputsUnsafeRows) + override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows) override def canProcessUnsafeRows: Boolean = true override def canProcessSafeRows: Boolean = true } @@ -282,7 +282,7 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { left.execute().map(_.copy()).intersection(right.execute().map(_.copy())) } - override def outputsUnsafeRows: Boolean = children.forall(_.outputsUnsafeRows) + override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows) override def canProcessUnsafeRows: Boolean = true override def canProcessSafeRows: Boolean = true } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala index 4b6d2d51ff926..b7d80b1179a4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala @@ -86,6 +86,13 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext { assert(!preparedPlan.outputsUnsafeRows) } + test("except requires all of its input rows' formats to agree") { + val plan = Except(outputsSafe, outputsUnsafe) + assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows) + val preparedPlan = sqlContext.prepareForExecution.execute(plan) + assert(preparedPlan.outputsUnsafeRows) + } + test("intersect can process unsafe rows") { val plan = Intersect(outputsUnsafe, outputsUnsafe) val preparedPlan = sqlContext.prepareForExecution.execute(plan) @@ -100,6 +107,13 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext { assert(!preparedPlan.outputsUnsafeRows) } + test("intersect requires all of its input rows' formats to agree") { + val plan = Intersect(outputsSafe, outputsUnsafe) + assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows) + val preparedPlan = sqlContext.prepareForExecution.execute(plan) + assert(preparedPlan.outputsUnsafeRows) + } + test("execute() fails an assertion if inputs rows are of different formats") { val e = intercept[AssertionError] { Union(Seq(outputsSafe, outputsUnsafe)).execute() From b3b70af7fce615454d5bee1f492ce84638f6b045 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 14 Dec 2015 17:20:51 -0800 Subject: [PATCH 3/3] removed unnecessary tests. --- .../execution/RowFormatConvertersSuite.scala | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala index b7d80b1179a4b..2328899bb2f8d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala @@ -65,13 +65,6 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext { assert(preparedPlan.outputsUnsafeRows) } - test("coalesce can process safe rows") { - val plan = Coalesce(1, outputsSafe) - val preparedPlan = sqlContext.prepareForExecution.execute(plan) - assert(getConverters(preparedPlan).isEmpty) - assert(!preparedPlan.outputsUnsafeRows) - } - test("except can process unsafe rows") { val plan = Except(outputsUnsafe, outputsUnsafe) val preparedPlan = sqlContext.prepareForExecution.execute(plan) @@ -79,13 +72,6 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext { assert(preparedPlan.outputsUnsafeRows) } - test("except can process safe rows") { - val plan = Except(outputsSafe, outputsSafe) - val preparedPlan = sqlContext.prepareForExecution.execute(plan) - assert(getConverters(preparedPlan).isEmpty) - assert(!preparedPlan.outputsUnsafeRows) - } - test("except requires all of its input rows' formats to agree") { val plan = Except(outputsSafe, outputsUnsafe) assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows) @@ -100,13 +86,6 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext { assert(preparedPlan.outputsUnsafeRows) } - test("intersect can process safe rows") { - val plan = Intersect(outputsSafe, outputsSafe) - val preparedPlan = sqlContext.prepareForExecution.execute(plan) - assert(getConverters(preparedPlan).isEmpty) - assert(!preparedPlan.outputsUnsafeRows) - } - test("intersect requires all of its input rows' formats to agree") { val plan = Intersect(outputsSafe, outputsUnsafe) assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)