From d163b575466094304a6740bbada5ba7000d893f9 Mon Sep 17 00:00:00 2001 From: caoxuewen Date: Wed, 9 Aug 2017 17:04:37 +0800 Subject: [PATCH] Improvement a special case for non-deterministic projects and filters in optimizer --- .../sql/catalyst/optimizer/Optimizer.scala | 12 +++++- .../optimizer/CollapseProjectSuite.scala | 24 +++++++++++ .../optimizer/ColumnPruningSuite.scala | 42 +++++++++++++++++++ 3 files changed, 76 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a51b385399d8..f73e8a3e925f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -495,8 +495,14 @@ object ColumnPruning extends Rule[LogicalPlan] { // Eliminate no-op Projects case p @ Project(_, child) if sameOutput(child.output, p.output) => child - // Can't prune the columns on LeafNode - case p @ Project(_, _: LeafNode) => p + // Can't prune the columns on LeafNode for deterministic projects + case p @ Project(fields, _: LeafNode) if fields.forall(_.deterministic) => p + + // The fields of Project contains non-deterministic. + // e.g Rand. Project will be split to two project. + case p @ Project(fields, child) + if !fields.forall(_.deterministic) && (child.outputSet -- p.references).nonEmpty => + p.copy(child = prunedChild(child, p.references)) // for all other logical plans that inherits the output from it's children case p @ Project(_, child) => @@ -535,6 +541,8 @@ object ColumnPruning extends Rule[LogicalPlan] { object CollapseProject extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + case p1 @ Project(fields, p2 @ Project(_, _: LeafNode)) + if !fields.forall(_.deterministic) && p2.references.nonEmpty => p1 case p1 @ Project(_, p2: Project) => if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList)) { p1 diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala index 587437e9aa81..35d7b961f6ab 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala @@ -119,4 +119,28 @@ class CollapseProjectSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + + test("SPARK-21520 collapse project contains nondeterministic, dependent into one deterministic") { + val query = testRelation + .select(('a).as('a_plus)) + .select(Rand(10).as('rand), 'a_plus) + + val optimized = Optimize.execute(query.analyze) + + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } + + test("SPARK-21520 collapse project into one nondeterministic, dependent into two deterministic") { + val query = testRelation + .select(('a ).as('a_plus), ('b).as('b_plus)) + .select(Rand(10).as('rand)) + + val optimized = Optimize.execute(query.analyze) + + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 77e4eff26c69..3d4f2c509f70 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -360,5 +360,47 @@ class ColumnPruningSuite extends PlanTest { comparePlans(optimized2, expected2.analyze) } + test("SPARK-21520 the fields of project only is nondeterministic") { + val testRelation = LocalRelation('key.int, 'value.string) + + // The fields of project only is non-deterministic. + val originalQuery = + testRelation + .select(Rand(10).as("rand")) + + val optimized = Optimize.execute(originalQuery.analyze) + + // correctAnswer: + // Project [rand(10) AS rand#5] + // +- LocalRelation , [key#0, value#1] + val correctAnswer = originalQuery.analyze + + comparePlans(optimized, correctAnswer) + } + + test("SPARK-21520 the fields of project contains nondeterministic") { + val testRelation = LocalRelation('key.int, 'value.string) + + // The fields of project contains non-deterministic. + // e.g Rand function. project will be split to two project. + val originalQuery = + testRelation + .select($"key".as("key_a"), Rand(10).as("rand")) + + val optimized = Optimize.execute(originalQuery.analyze) + + // correctAnswer: + // Project [key#0 AS key_a#4, rand(10) AS rand#5] + // +- Project [key#0] + // +- LocalRelation , [key#0, value#1] + val correctAnswer = + testRelation + .select($"key") + .select($"key".as("key_a"), Rand(10).as("rand")) + .analyze + + comparePlans(optimized, correctAnswer) + } + // todo: add more tests for column pruning }