Skip to content

Conversation

@hvanhovell
Copy link
Contributor

This PR adds support for multiple column in a single count distinct aggregate to the new aggregation path.

cc @yhuai

@rxin
Copy link
Contributor

rxin commented Nov 5, 2015

Can you add some test cases for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to check if the no-structs allowed in grouping keys policy will create a problem for us when we try to use this in a multiple distinct setting.

@yhuai
Copy link
Contributor

yhuai commented Nov 6, 2015

ok to test

@yhuai
Copy link
Contributor

yhuai commented Nov 6, 2015

test this please

@SparkQA
Copy link

SparkQA commented Nov 6, 2015

Test build #45237 has started for PR 9409 at commit aa2ac6b.

@marmbrus
Copy link
Contributor

marmbrus commented Nov 6, 2015

test this please

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be child.eval(input).asInstanceOf[InternalRow].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add an test in ConditionalExpressionSuite, I think that would have caught this bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix this and add a test tomorrow morning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it turns out this is actually correct. The eval method of UnaryExpression will call eval on the child expression and pass the result on the nullSafeEval method if it's not null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, you are right. I did not notice that.

@marmbrus
Copy link
Contributor

marmbrus commented Nov 6, 2015

This is great. Only minor comments.

@marmbrus
Copy link
Contributor

marmbrus commented Nov 6, 2015

add to whitelist

@marmbrus
Copy link
Contributor

marmbrus commented Nov 6, 2015

ok to test

@hvanhovell
Copy link
Contributor Author

test this please

1 similar comment
@yhuai
Copy link
Contributor

yhuai commented Nov 7, 2015

test this please

@SparkQA
Copy link

SparkQA commented Nov 7, 2015

Test build #2004 has started for PR 9409 at commit 8538e11.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai if we combine this with the distinct rewriting rule. It will add a struct to the groupBy clause of the first aggregate. This is currently not allowed in the new UDAF path, so it'll fall back to the old path. For example:

val data2 = Seq[(Integer, Integer, Integer)](
    (1, 10, -10),
    (null, -60, 60),
    (1, 30, -30),
    (1, 30, 30),
    (2, 1, 1),
    (null, -10, 10),
    (2, -1, null),
    (2, 1, 1),
    (2, null, 1),
    (null, 100, -10),
    (3, null, 3),
    (null, null, null),
    (3, null, null)).toDF("key", "value1", "value2")
data2.registerTempTable("agg2")

val q sql(
 """
 |SELECT
 |  key,
 |  count(distinct value1),
 |  count(distinct value2),
 |  count(distinct value1, value2)
 |FROM agg2
 |GROUP BY key
 """.stripMargin)

Will create the following physical plan:

== Physical Plan ==
TungstenAggregate(key=[key#3], functions=[(count(if ((gid#44 = 1)) attributereference#45 else null),mode=Final,isDistinct=false),(count(if ((gid#44 = 3)) attributereference#47 else null),mode=Final,isDistinct=false),(count(if ((gid#44 = 2)) dropanynull#46 else null),mode=Final,isDistinct=false)], output=[key#3,_c1#32L,_c2#33L,_c3#34L])
 TungstenExchange(Shuffle without coordinator) hashpartitioning(key#3,200), None
  TungstenAggregate(key=[key#3], functions=[(count(if ((gid#44 = 1)) attributereference#45 else null),mode=Partial,isDistinct=false),(count(if ((gid#44 = 3)) attributereference#47 else null),mode=Partial,isDistinct=false),(count(if ((gid#44 = 2)) dropanynull#46 else null),mode=Partial,isDistinct=false)], output=[key#3,count#49L,count#53L,count#51L])
   Aggregate false, [key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44], [key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44]
    ConvertToSafe
     TungstenExchange(Shuffle without coordinator) hashpartitioning(key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44,200), None
      ConvertToUnsafe
       Aggregate true, [key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44], [key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44]
        !Expand [List(key#3, value1#4, null, null, 1),List(key#3, null, dropanynull(struct(value1#4,value2#5)), null, 2),List(key#3, null, null, value2#5, 3)], [key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44]
         LocalTableScan [key#3,value1#4,value2#5], [[1,10,-10],[null,-60,60],[1,30,-30],[1,30,30],[2,1,1],[null,-10,10],[2,-1,null],[2,1,1],[2,null,1],[null,100,-10],[3,null,3],[null,null,null],[3,null,null]]

Is it possible to add support for fixed width structs as group by expression to the new aggregation path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick follow-up.

Allowing structs does not seem to create a problem. I disabled this line locally: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala#L36. And now it uses the TungstenAggregate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, yes. Based on https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L240-L266, we can compare two struct values. Looks like only array and map types are not handled there. So, I think we can visit all data types of a struct and if it does not have array or map, we can use new agg code path. Can you update Utils.scala? I am thinking about if an array or a map appear in the grouping expressions, we throw an analysis error and say it is not allowed right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added (proper) StructType checking.

Do you want me to also start throwing AnalysisError's?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make the change of throwing analysis error in my pr.

@hvanhovell
Copy link
Contributor Author

test this please

1 similar comment
@yhuai
Copy link
Contributor

yhuai commented Nov 7, 2015

test this please

@yhuai
Copy link
Contributor

yhuai commented Nov 7, 2015

retest this please

@yhuai
Copy link
Contributor

yhuai commented Nov 7, 2015

hmm... seems jenkins did not pick up this pr.

@yhuai
Copy link
Contributor

yhuai commented Nov 7, 2015

Hope https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2005/ can finish without any problem.

@hvanhovell
Copy link
Contributor Author

@yhuai
Copy link
Contributor

yhuai commented Nov 7, 2015

oh, right, I pasted the wrong link.

@hvanhovell
Copy link
Contributor Author

Seems like I have broken something. I'll need to rebase anyway.

@SparkQA
Copy link

SparkQA commented Nov 7, 2015

Test build #2005 has finished for PR 9409 at commit 5c46cec.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class DropAnyNull(child: Expression) extends UnaryExpression with ExpectsInputTypes\n

@yhuai
Copy link
Contributor

yhuai commented Nov 8, 2015

Looks like the failed test exposed a problem of our rewriter? Different results are all from regular count.

@hvanhovell
Copy link
Contributor Author

test this please

@hvanhovell
Copy link
Contributor Author

retest this please

@hvanhovell
Copy link
Contributor Author

Jenkins does not like me...

@hvanhovell
Copy link
Contributor Author

@yhuai can you get jenkins to test this?

The bug exposed by this patch affected the regular aggregation path, as soon as we used more than one regular aggregate, the chance existed that an attribute and its source expression got misaligned. This has been fixed. I have also added a test for this situation.

If we choose not to add this to the 1.6 branch, then we have to create a separte PR containing only the bugfix and get that one in.

@yhuai
Copy link
Contributor

yhuai commented Nov 8, 2015

test this please

@yhuai
Copy link
Contributor

yhuai commented Nov 8, 2015

ok to test

@yhuai
Copy link
Contributor

yhuai commented Nov 8, 2015

add to whitelist

@yhuai
Copy link
Contributor

yhuai commented Nov 8, 2015

Not sure why jenkins did not get triggered after you updated the PR. Let's try to get this in branch 1.6 since it is needed to remove old agg path.

@SparkQA
Copy link

SparkQA commented Nov 8, 2015

Test build #2012 has finished for PR 9409 at commit 4e53aab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class DropAnyNull(child: Expression) extends UnaryExpression with ExpectsInputTypes\n

@yhuai
Copy link
Contributor

yhuai commented Nov 8, 2015

LGTM. Merging to master and branch 1.6

asfgit pushed a commit that referenced this pull request Nov 8, 2015
This PR adds support for multiple column in a single count distinct aggregate to the new aggregation path.

cc yhuai

Author: Herman van Hovell <[email protected]>

Closes #9409 from hvanhovell/SPARK-11451.

(cherry picked from commit 30c8ba7)
Signed-off-by: Yin Huai <[email protected]>
@asfgit asfgit closed this in 30c8ba7 Nov 8, 2015
asfgit pushed a commit that referenced this pull request Nov 29, 2015
In #9409 we enabled multi-column counting. The approach taken in that PR introduces a bit of overhead by first creating a row only to check if all of the columns are non-null.

This PR fixes that technical debt. Count now takes multiple columns as its input. In order to make this work I have also added support for multiple columns in the single distinct code path.

cc yhuai

Author: Herman van Hovell <[email protected]>

Closes #10015 from hvanhovell/SPARK-12024.

(cherry picked from commit 3d28081)
Signed-off-by: Yin Huai <[email protected]>
asfgit pushed a commit that referenced this pull request Nov 29, 2015
In #9409 we enabled multi-column counting. The approach taken in that PR introduces a bit of overhead by first creating a row only to check if all of the columns are non-null.

This PR fixes that technical debt. Count now takes multiple columns as its input. In order to make this work I have also added support for multiple columns in the single distinct code path.

cc yhuai

Author: Herman van Hovell <[email protected]>

Closes #10015 from hvanhovell/SPARK-12024.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants