Skip to content

Conversation

@gatorsmile
Copy link
Member

This PR resolves two issues:

First, expanding * inside aggregate functions of structs when using Dataframe/Dataset APIs. For example,

structDf.groupBy($"a").agg(min(struct($"record.*")))

Second, it improves the error messages when having invalid star usage when using Dataframe/Dataset APIs. For example,

pagecounts4PartitionsDS
  .map(line => (line._1, line._3))
  .toDF()
  .groupBy($"_1")
  .agg(sum("*") as "sumOccurances")

Before the fix, the invalid usage will issue a confusing error message, like:

org.apache.spark.sql.AnalysisException: cannot resolve '_1' given input columns _1, _2;

After the fix, the message is like:

org.apache.spark.sql.AnalysisException: Invalid usage of '*' in function 'sum'

cc: @rxin @nongli @cloud-fan

case s: Star => s.expand(child, resolver)
case o => o :: Nil
})
case c: CreateStruct if containsStar(c.children) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we have the other functions that can accept star as an input parameter. If so, I think we need to create a trait for all these case classes. Then, we can remove the duplicate code. Any better idea? Thanks! : )

@SparkQA
Copy link

SparkQA commented Feb 15, 2016

Test build #51321 has finished for PR 11208 at commit 3b2b448.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Feb 15, 2016

cc @cloud-fan

case s: Star => s.expand(child, resolver)
case o => o :: Nil
})
case c: CreateStructUnsafe if containsStar(c.children) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateStructUnsafe only appears after unsafe projection, so I think we don't need to handle it in Analyzer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw it is being used in two parts in Analyzer. Will remove them. Thanks!

@cloud-fan
Copy link
Contributor

The PR title looks confusing, Star Expansion is already done, what this PR did is fixing a problem of missing CreateStruct when handle stars and adding a better error message, @gatorsmile could you improve it to make it more clear?

@gatorsmile
Copy link
Member Author

Star Expansion only works when the star are in a UnresolvedFunction.

So far, Spark SQL does not handle star expansion when we use star in the DataFrame or DataSet functions. That is the reason I chose this title. Let me change it.

Actually, I am not sure if CreateStruct and Count are the only two functions that can accept star. Could you help me confirm it? Thanks!

@gatorsmile gatorsmile changed the title [SPARK-13320] [SQL] Star Expansion for Dataframe/Dataset Functions [SPARK-13320] [SQL] Support Star in CreateStruct and Error Handling when DataFrame/DataSet Functions using Star Feb 16, 2016
@cloud-fan
Copy link
Contributor

Actually we do handle stars in CreateArray and CreateStruct: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L440-L458, so what you are fixing is the nested CreateStruct, I think we should also add CreateArray too.

One of my concern is: sometimes we check stars under UnresolvedAlias but sometimes also under Alias, it will be good if you can figure it out and make sure there is no missing case.

@gatorsmile
Copy link
Member Author

uh, I see. The code you posted above is for Project. The error message in the original JIRA is for having star used in Aggregate.

Yeah, we need a clean and complete fix for resolving star. Let me check if can move these into expandStarExpressions.

@gatorsmile
Copy link
Member Author

@cloud-fan The latest commit separates star resolution from the reference resolution, since ResolveReferences becomes pretty long now. Could you help me check if the new changes cover all the cases that can accept star? Thank you! : )

@SparkQA
Copy link

SparkQA commented Feb 19, 2016

Test build #51512 has finished for PR 11208 at commit ac71f39.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

""".stripMargin).select($"r.*"),
Row(3, 2) :: Nil)

assert(structDf.groupBy($"a").agg(min(struct($"record.*"))).first() == Row(3, Row(3, 1)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should write a new test case to test * in CreateStruct and CreateArray, not just put in existing ones.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do. Thanks!

@cloud-fan
Copy link
Contributor

Overall LGTM except some comments about tests, thanks for working on it!

@SparkQA
Copy link

SparkQA commented Feb 19, 2016

Test build #51536 has finished for PR 11208 at commit 2c72edf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile gatorsmile changed the title [SPARK-13320] [SQL] Support Star in CreateStruct and Error Handling when DataFrame/DataSet Functions using Star [SPARK-13320] [SQL] Support Star in CreateStruct/CreateArray and Error Handling when DataFrame/DataSet Functions using Star Feb 20, 2016
@gatorsmile
Copy link
Member Author

retest this please

val f = udf((a: String) => a)
val df = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
df.select(struct($"a").as("s")).select(f($"s.a")).collect()
df.select(struct($"*").as("s")).select(f($"s.a")).collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed?

@SparkQA
Copy link

SparkQA commented Feb 23, 2016

Test build #51701 has finished for PR 11208 at commit 2c72edf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 23, 2016

Test build #51729 has finished for PR 11208 at commit 6b2d609.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
)
case g: Generate if containsStar(g.generator.children) =>
failAnalysis("Cannot explode *, explode can only be applied on a specific column.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just realized the error message is not clear enough, Generate is not always "explode"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a test for this error message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. I moved this from another rule. I will check the coverage of test cases. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a test case: https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala#L181-L182

How about changing the message to Invalid usage of '*' in explode/json_tuple/UDTF? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explode/json_tuple/UDTF LGTM

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Let me change it now.

@gatorsmile
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 12, 2016

Test build #53008 has finished for PR 11208 at commit e060dea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 17, 2016

Test build #53376 has finished for PR 11208 at commit e060dea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

cc @yhuai

@gatorsmile
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 19, 2016

Test build #53620 has finished for PR 11208 at commit e060dea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

cc @yhuai

case o => o :: Nil
})
// count(*) has been replaced by count(1)
case o if containsStar(o.children) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have a method:

private def mayContainsStar(expr: Expression): Boolean = expr.isInstnaceOf[UnresolvedFunction] || expr.isInstnaceOf[CreateStruct]...

then we can simplify this to:

expr.transformUp {
  case e if mayContainsStar(e) =>
    e.copy(children = ...)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a great idea! : )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried it, but copy is unable to use here. When the type is Expression (abstract type), we are unable to use the copy function to change the children. In addition, withNewChildren requires the same number of children. Do you have any idea how to fix it? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see, I don't have a better idea, let's just keep it this way.

@cloud-fan
Copy link
Contributor

Sorry for putting it here for such a long time, overall LGTM, will merge it after you address the new comments, thanks!

@gatorsmile
Copy link
Member Author

@cloud-fan Thank you for your detailed reviews! I know all of you are very busy. Let me know if anything needs a change. Thanks again!

UnresolvedAlias(child = expandStarExpression(ua.child, p.child)) :: Nil
case a @ Alias(_: UnresolvedFunction | _: CreateArray | _: CreateStruct, _) =>
Alias(child = expandStarExpression(a.child, p.child), a.name)(
isGenerated = a.isGenerated) :: Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will lose qualifier here, how about a.withNewChildren(expandStarExpression(a.child, p.child) :: Nil)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, a good catch!

@SparkQA
Copy link

SparkQA commented Mar 21, 2016

Test build #53655 has finished for PR 11208 at commit ba3fe7c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}

test("Star Expansion - CreateStruct and CreateArray") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we put these tests in SQLQuerySuite? It looks like they are mostly testing DF APIs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, let me move them to DataFrameSuite. Thanks!

@SparkQA
Copy link

SparkQA commented Mar 21, 2016

Test build #53661 has finished for PR 11208 at commit 0fce075.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 21, 2016

Test build #53685 has finished for PR 11208 at commit 50abeec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 3f49e07 Mar 22, 2016
@cloud-fan
Copy link
Contributor

thanks! merging to master!

roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
… Handling when DataFrame/DataSet Functions using Star

This PR resolves two issues:

First, expanding * inside aggregate functions of structs when using Dataframe/Dataset APIs. For example,
```scala
structDf.groupBy($"a").agg(min(struct($"record.*")))
```

Second, it improves the error messages when having invalid star usage when using Dataframe/Dataset APIs. For example,
```scala
pagecounts4PartitionsDS
  .map(line => (line._1, line._3))
  .toDF()
  .groupBy($"_1")
  .agg(sum("*") as "sumOccurances")
```
Before the fix, the invalid usage will issue a confusing error message, like:
```
org.apache.spark.sql.AnalysisException: cannot resolve '_1' given input columns _1, _2;
```
After the fix, the message is like:
```
org.apache.spark.sql.AnalysisException: Invalid usage of '*' in function 'sum'
```
cc: rxin nongli cloud-fan

Author: gatorsmile <[email protected]>

Closes apache#11208 from gatorsmile/sumDataSetResolution.
@davies
Copy link
Contributor

davies commented Mar 25, 2016

@gatorsmile @cloud-fan This PR revert the change in #3674, unfortunately the unit test in AnalysisSuite. This test break once we enforce max-iteration check in tests, see https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54090/testReport/org.apache.spark.sql.catalyst.analysis/AnalysisSuite/union_project__/

@davies
Copy link
Contributor

davies commented Mar 25, 2016

@gatorsmile This PR can't be easily reverted, so could you send a PR to fix it?

@davies
Copy link
Contributor

davies commented Mar 25, 2016

I will fix this in #11828

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants