-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-9393] [SQL] Fix several error-handling bugs in ScriptTransform operator #7710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fa18d26
b43e4ec
bd4c948
4ee36a2
8b162b6
88278de
b31258d
323bb2b
494cde0
6a06a8c
983f200
16c44e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite | |
| import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute | ||
| import org.apache.spark.sql.catalyst.util._ | ||
| import org.apache.spark.sql.test.TestSQLContext | ||
| import org.apache.spark.sql.{DataFrame, DataFrameHolder, Row} | ||
| import org.apache.spark.sql.{SQLContext, DataFrame, DataFrameHolder, Row} | ||
|
|
||
| import scala.language.implicitConversions | ||
| import scala.reflect.runtime.universe.TypeTag | ||
|
|
@@ -33,11 +33,13 @@ import scala.util.control.NonFatal | |
| */ | ||
| class SparkPlanTest extends SparkFunSuite { | ||
|
|
||
| protected def sqlContext: SQLContext = TestSQLContext | ||
|
|
||
| /** | ||
| * Creates a DataFrame from a local Seq of Product. | ||
| */ | ||
| implicit def localSeqToDataFrameHolder[A <: Product : TypeTag](data: Seq[A]): DataFrameHolder = { | ||
| TestSQLContext.implicits.localSeqToDataFrameHolder(data) | ||
| sqlContext.implicits.localSeqToDataFrameHolder(data) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -98,7 +100,7 @@ class SparkPlanTest extends SparkFunSuite { | |
| planFunction: Seq[SparkPlan] => SparkPlan, | ||
| expectedAnswer: Seq[Row], | ||
| sortAnswers: Boolean = true): Unit = { | ||
| SparkPlanTest.checkAnswer(input, planFunction, expectedAnswer, sortAnswers) match { | ||
| SparkPlanTest.checkAnswer(input, planFunction, expectedAnswer, sortAnswers, sqlContext) match { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a call on the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is minor, and does not have to be addressed in this PR, but why did we make that an
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that the idea was to allow re-use of some of the SparkPlanTest functions without having to mix them into the test suite; I agree that this could maybe be simplified by making both SparkPlanTest and QueryTest into traits / mixins so that you can easily use both in the same test suite. |
||
| case Some(errorMessage) => fail(errorMessage) | ||
| case None => | ||
| } | ||
|
|
@@ -121,7 +123,8 @@ class SparkPlanTest extends SparkFunSuite { | |
| planFunction: SparkPlan => SparkPlan, | ||
| expectedPlanFunction: SparkPlan => SparkPlan, | ||
| sortAnswers: Boolean = true): Unit = { | ||
| SparkPlanTest.checkAnswer(input, planFunction, expectedPlanFunction, sortAnswers) match { | ||
| SparkPlanTest.checkAnswer( | ||
| input, planFunction, expectedPlanFunction, sortAnswers, sqlContext) match { | ||
| case Some(errorMessage) => fail(errorMessage) | ||
| case None => | ||
| } | ||
|
|
@@ -147,13 +150,14 @@ object SparkPlanTest { | |
| input: DataFrame, | ||
| planFunction: SparkPlan => SparkPlan, | ||
| expectedPlanFunction: SparkPlan => SparkPlan, | ||
| sortAnswers: Boolean): Option[String] = { | ||
| sortAnswers: Boolean, | ||
| sqlContext: SQLContext): Option[String] = { | ||
|
|
||
| val outputPlan = planFunction(input.queryExecution.sparkPlan) | ||
| val expectedOutputPlan = expectedPlanFunction(input.queryExecution.sparkPlan) | ||
|
|
||
| val expectedAnswer: Seq[Row] = try { | ||
| executePlan(expectedOutputPlan) | ||
| executePlan(expectedOutputPlan, sqlContext) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| val errorMessage = | ||
|
|
@@ -168,7 +172,7 @@ object SparkPlanTest { | |
| } | ||
|
|
||
| val actualAnswer: Seq[Row] = try { | ||
| executePlan(outputPlan) | ||
| executePlan(outputPlan, sqlContext) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| val errorMessage = | ||
|
|
@@ -207,12 +211,13 @@ object SparkPlanTest { | |
| input: Seq[DataFrame], | ||
| planFunction: Seq[SparkPlan] => SparkPlan, | ||
| expectedAnswer: Seq[Row], | ||
| sortAnswers: Boolean): Option[String] = { | ||
| sortAnswers: Boolean, | ||
| sqlContext: SQLContext): Option[String] = { | ||
|
|
||
| val outputPlan = planFunction(input.map(_.queryExecution.sparkPlan)) | ||
|
|
||
| val sparkAnswer: Seq[Row] = try { | ||
| executePlan(outputPlan) | ||
| executePlan(outputPlan, sqlContext) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| val errorMessage = | ||
|
|
@@ -275,10 +280,10 @@ object SparkPlanTest { | |
| } | ||
| } | ||
|
|
||
| private def executePlan(outputPlan: SparkPlan): Seq[Row] = { | ||
| private def executePlan(outputPlan: SparkPlan, sqlContext: SQLContext): Seq[Row] = { | ||
| // A very simple resolver to make writing tests easier. In contrast to the real resolver | ||
| // this is always case sensitive and does not try to handle scoping or complex type resolution. | ||
| val resolvedPlan = TestSQLContext.prepareForExecution.execute( | ||
| val resolvedPlan = sqlContext.prepareForExecution.execute( | ||
| outputPlan transform { | ||
| case plan: SparkPlan => | ||
| val inputMap = plan.children.flatMap(_.output).map(a => (a.name, a)).toMap | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -874,15 +874,15 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C | |
| } | ||
|
|
||
| def matchSerDe(clause: Seq[ASTNode]) | ||
| : (Seq[(String, String)], String, Seq[(String, String)]) = clause match { | ||
| : (Seq[(String, String)], Option[String], Seq[(String, String)]) = clause match { | ||
| case Token("TOK_SERDEPROPS", propsClause) :: Nil => | ||
| val rowFormat = propsClause.map { | ||
| case Token(name, Token(value, Nil) :: Nil) => (name, value) | ||
| } | ||
| (rowFormat, "", Nil) | ||
| (rowFormat, None, Nil) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought it was confusing to use an empty string to represent a missing value, hence this change.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could but I feel that's a bit less clear and more error-prone.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to this change. |
||
|
|
||
| case Token("TOK_SERDENAME", Token(serdeClass, Nil) :: Nil) :: Nil => | ||
| (Nil, serdeClass, Nil) | ||
| (Nil, Some(BaseSemanticAnalyzer.unescapeSQLString(serdeClass)), Nil) | ||
|
|
||
| case Token("TOK_SERDENAME", Token(serdeClass, Nil) :: | ||
| Token("TOK_TABLEPROPERTIES", | ||
|
|
@@ -891,9 +891,9 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C | |
| case Token("TOK_TABLEPROPERTY", Token(name, Nil) :: Token(value, Nil) :: Nil) => | ||
| (name, value) | ||
| } | ||
| (Nil, serdeClass, serdeProps) | ||
| (Nil, Some(BaseSemanticAnalyzer.unescapeSQLString(serdeClass)), serdeProps) | ||
|
|
||
| case Nil => (Nil, "", Nil) | ||
| case Nil => (Nil, None, Nil) | ||
| } | ||
|
|
||
| val (inRowFormat, inSerdeClass, inSerdeProps) = matchSerDe(inputSerdeClause) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes here in SparkPlanTest are to allow it to be used with TestHive without leading to the dreaded "multiple active SparkContexts in the same JVM" error.