Skip to content

Conversation

@hvanhovell
Copy link
Contributor

This PR removes Hive windows functions from Spark and replaces them with (native) Spark ones. The PR is on par with Hive in terms of features.

This has the following advantages:

  • Better memory management.
  • The ability to use spark UDAFs in Window functions.

cc @rxin / @yhuai

@yhuai
Copy link
Contributor

yhuai commented Nov 18, 2015

test this please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a small trick to allow us to add the ImperativeAggregate to the evaluation projection. The advantage of this is that we are avoiding the use of the relatively expensive generic update method and that we don't have to use a seperate indices array to keep track of the location to store the evaluation result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recently I also tried this trick, but failed, because the eval() usually only use attributes in the buffer, but BoundReference will try to look attributes for child of AggregateFunction, which may not exists.

Could you have a test case for it? (using AggregateFunction as window function)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be fine as long as we add already bound ImperativeAggregates to the projection to be code generated. Unbound ImperativeAggregates will cause alot of trouble.

I use HyperLogLogPlusPlus in the last test in the DataFrameWindowFunctionSuite: https://github.com/hvanhovell/spark/blob/SPARK-8641-2/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala#L222 is this enough?

@SparkQA
Copy link

SparkQA commented Nov 19, 2015

Test build #46251 has finished for PR 9819 at commit b66ef4d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class ImperativeAggregate extends AggregateFunction with CodegenFallback\n * trait WindowFunction extends Expression\n * case class Lead(input: Expression, offset: Expression, default: Expression)\n * case class Lag(input: Expression, offset: Expression, default: Expression)\n * abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowFunction\n * abstract class RowNumberLike extends AggregateWindowFunction\n * trait SizeBasedWindowFunction extends AggregateWindowFunction\n * case class RowNumber() extends RowNumberLike\n * case class CumeDist(n: Expression)\n * case class NTile(buckets: Expression, n: Expression)\n * abstract class RankLike extends AggregateWindowFunction\n * case class Rank(order: Seq[Expression]) extends RankLike\n * case class DenseRank(order: Seq[Expression]) extends RankLike\n * case class PercentRank(order: Seq[Expression], n: Expression) extends RankLike\n

@SparkQA
Copy link

SparkQA commented Nov 19, 2015

Test build #46331 has finished for PR 9819 at commit 6ebee15.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class ImperativeAggregate extends AggregateFunction with CodegenFallback\n * trait WindowFunction extends Expression\n * case class Lead(input: Expression, offset: Expression, default: Expression)\n * case class Lag(input: Expression, offset: Expression, default: Expression)\n * abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowFunction\n * abstract class RowNumberLike extends AggregateWindowFunction\n * trait SizeBasedWindowFunction extends AggregateWindowFunction\n * case class RowNumber() extends RowNumberLike\n * case class CumeDist(n: Expression)\n * case class NTile(buckets: Expression, n: Expression)\n * abstract class RankLike extends AggregateWindowFunction\n * case class Rank(order: Seq[Expression]) extends RankLike\n * case class DenseRank(order: Seq[Expression]) extends RankLike\n * case class PercentRank(order: Seq[Expression], n: Expression) extends RankLike\n

@SparkQA
Copy link

SparkQA commented Nov 19, 2015

Test build #46338 has finished for PR 9819 at commit b3f5a39.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class ImperativeAggregate extends AggregateFunction with CodegenFallback\n * trait WindowFunction extends Expression\n * case class Lead(input: Expression, offset: Expression, default: Expression)\n * case class Lag(input: Expression, offset: Expression, default: Expression)\n * abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowFunction\n * abstract class RowNumberLike extends AggregateWindowFunction\n * trait SizeBasedWindowFunction extends AggregateWindowFunction\n * case class RowNumber() extends RowNumberLike\n * case class CumeDist(n: Expression)\n * case class NTile(buckets: Expression, n: Expression)\n * abstract class RankLike extends AggregateWindowFunction\n * case class Rank(order: Seq[Expression]) extends RankLike\n * case class DenseRank(order: Seq[Expression]) extends RankLike\n * case class PercentRank(order: Seq[Expression], n: Expression) extends RankLike\n

@SparkQA
Copy link

SparkQA commented Nov 19, 2015

Test build #46346 has finished for PR 9819 at commit e95c42e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class ImperativeAggregate extends AggregateFunction with CodegenFallback\n * trait WindowFunction extends Expression\n * case class Lead(input: Expression, offset: Expression, default: Expression)\n * case class Lag(input: Expression, offset: Expression, default: Expression)\n * abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowFunction\n * abstract class RowNumberLike extends AggregateWindowFunction\n * trait SizeBasedWindowFunction extends AggregateWindowFunction\n * case class RowNumber() extends RowNumberLike\n * case class CumeDist(n: Expression)\n * case class NTile(buckets: Expression, n: Expression)\n * abstract class RankLike extends AggregateWindowFunction\n * case class Rank(order: Seq[Expression]) extends RankLike\n * case class DenseRank(order: Seq[Expression]) extends RankLike\n * case class PercentRank(order: Seq[Expression], n: Expression) extends RankLike\n

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withNewChildren does not work with AggregateExpression; I am working arround that here.

@SparkQA
Copy link

SparkQA commented Nov 27, 2015

Test build #46820 has finished for PR 9819 at commit 31c6fb3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Nov 29, 2015

@hvanhovell Thank you for the PR! Just a quick heads up. We will allocate time to review during next week (and the week after if we need more time to work on it).

@yhuai
Copy link
Contributor

yhuai commented Nov 29, 2015

One quick question. With this PR, is it possible to use any Spark SQL's aggregate function as a window function?

@hvanhovell
Copy link
Contributor Author

Yes. You can use any Spark aggregate function as a window function. Most Hive UDAFs should also work except for the pivoted ones...

@SparkQA
Copy link

SparkQA commented Dec 1, 2015

Test build #46998 has finished for PR 9819 at commit d7f13a0.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class ImperativeAggregate extends AggregateFunction with CodegenFallback\n * trait WindowFunction extends Expression\n * case class Lead(input: Expression, offset: Expression, default: Expression)\n * case class Lag(input: Expression, offset: Expression, default: Expression)\n * abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowFunction\n * abstract class RowNumberLike extends AggregateWindowFunction\n * trait SizeBasedWindowFunction extends AggregateWindowFunction\n * case class RowNumber() extends RowNumberLike\n * case class CumeDist(n: Expression)\n * case class NTile(buckets: Expression, n: Expression)\n * abstract class RankLike extends AggregateWindowFunction\n * case class Rank(order: Seq[Expression]) extends RankLike\n * case class DenseRank(order: Seq[Expression]) extends RankLike\n * case class PercentRank(order: Seq[Expression], n: Expression) extends RankLike\n

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check if buckets is a foldable expression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Yes and No. The buckets value only has to be constant within a partition, it would also work if the value is part of the partitioning clause. It is - however - quite a bit of work to get that in. For now I'd rather enforce a global constant number of buckets. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I somehow missed case x => throw new AnalysisException(... Sorry.

It makes sense. Let's keep it as is.

@yhuai
Copy link
Contributor

yhuai commented Dec 13, 2015

Do we have a test case that uses a UDAF as window function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we need to say it is also used as the default frame?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, is it used as the default frame?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit more strict than that. It is the only frame in which a WindowFunction is supposed to be evaluated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see.

@yhuai
Copy link
Contributor

yhuai commented Dec 13, 2015

Can you add scala doc to explain how we evaluate an regular agg function when it is used as a window function? (Maybe I missed it)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 spaces? Or the colon without a result?

@yhuai
Copy link
Contributor

yhuai commented Dec 14, 2015

@hvanhovell This is very cool! I have finished my review.

@davies
Copy link
Contributor

davies commented Dec 14, 2015

LGTM

@SparkQA
Copy link

SparkQA commented Dec 14, 2015

Test build #47676 has finished for PR 9819 at commit b4d9ca9.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class ImperativeAggregate extends AggregateFunction with CodegenFallback\n * case class UnresolvedWindowExpression(\n * case class WindowExpression(\n * case class Lead(input: Expression, offset: Expression, default: Expression)\n * case class Lag(input: Expression, offset: Expression, default: Expression)\n * abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowFunction\n * abstract class RowNumberLike extends AggregateWindowFunction\n * trait SizeBasedWindowFunction extends AggregateWindowFunction\n * case class RowNumber() extends RowNumberLike\n * case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction\n * case class NTile(buckets: Expression) extends RowNumberLike with SizeBasedWindowFunction\n * abstract class RankLike extends AggregateWindowFunction\n * case class Rank(children: Seq[Expression]) extends RankLike\n * case class DenseRank(children: Seq[Expression]) extends RankLike\n * case class PercentRank(children: Seq[Expression]) extends RankLike with SizeBasedWindowFunction\n

@hvanhovell
Copy link
Contributor Author

Build failed due to R versioning problem. I'll try again when this is sorted out.

@hvanhovell
Copy link
Contributor Author

@yhuai I fixed/addressed/improved most of the things you have raised. Two things worth pointing out:

You can find the test for UDAF here: https://github.com/hvanhovell/spark/blob/SPARK-8641-2/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala#L240-L294

You can find the documentation on how we evaluate a regular AggregateFunction here: https://github.com/hvanhovell/spark/blob/SPARK-8641-2/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala#L705-L712

@hvanhovell
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 15, 2015

Test build #47685 has finished for PR 9819 at commit b4d9ca9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class ImperativeAggregate extends AggregateFunction with CodegenFallback\n * case class UnresolvedWindowExpression(\n * case class WindowExpression(\n * case class Lead(input: Expression, offset: Expression, default: Expression)\n * case class Lag(input: Expression, offset: Expression, default: Expression)\n * abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowFunction\n * abstract class RowNumberLike extends AggregateWindowFunction\n * trait SizeBasedWindowFunction extends AggregateWindowFunction\n * case class RowNumber() extends RowNumberLike\n * case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction\n * case class NTile(buckets: Expression) extends RowNumberLike with SizeBasedWindowFunction\n * abstract class RankLike extends AggregateWindowFunction\n * case class Rank(children: Seq[Expression]) extends RankLike\n * case class DenseRank(children: Seq[Expression]) extends RankLike\n * case class PercentRank(children: Seq[Expression]) extends RankLike with SizeBasedWindowFunction\n

@SparkQA
Copy link

SparkQA commented Dec 15, 2015

Test build #47726 has finished for PR 9819 at commit c181c8b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * abstract class ImperativeAggregate extends AggregateFunction with CodegenFallback\n * case class UnresolvedWindowExpression(\n * case class WindowExpression(\n * case class Lead(input: Expression, offset: Expression, default: Expression)\n * case class Lag(input: Expression, offset: Expression, default: Expression)\n * abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowFunction\n * abstract class RowNumberLike extends AggregateWindowFunction\n * trait SizeBasedWindowFunction extends AggregateWindowFunction\n * case class RowNumber() extends RowNumberLike\n * case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction\n * case class NTile(buckets: Expression) extends RowNumberLike with SizeBasedWindowFunction\n * abstract class RankLike extends AggregateWindowFunction\n * case class Rank(children: Seq[Expression]) extends RankLike\n * case class DenseRank(children: Seq[Expression]) extends RankLike\n * case class PercentRank(children: Seq[Expression]) extends RankLike with SizeBasedWindowFunction\n

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we can add some comments to explain how it works in a follow-up PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add some documentation on all the window functions. The inner workings of ntile in particular need some documentation.

@yhuai
Copy link
Contributor

yhuai commented Dec 17, 2015

Thank you @hvanhovell ! I am going to merge it. Let's have a follow-up PR to add more docs to those newly added functions. Also, can we add tests like the following?

sql(s"""
           |select  p_mfgr,p_name, p_size,
           |avg(null) over(distribute by p_mfgr sort by p_name) as avg
           |from part
      """.stripMargin).show

    val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), ("b", 2))
      .toDF("key", "value")
    val window = Window.orderBy()
      df.select(
          $"key", $"value",
          sum(lit(null)).over(window)).show

Basically, we test cases using null literals as the argument of a window function (I tested them manually and the results look good).

@asfgit asfgit closed this in 658f66e Dec 17, 2015
@hvanhovell
Copy link
Contributor Author

@yhuai & @davies thanks for the reviews!

asfgit pushed a commit that referenced this pull request Dec 31, 2015
…-up (docs & tests)

This PR is a follow-up for PR #9819. It adds documentation for the window functions and a couple of NULL tests.

The documentation was largely based on the documentation in (the source of)  Hive and Presto:
* https://prestodb.io/docs/current/functions/window.html
* https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics

I am not sure if we need to add the licenses of these two projects to the licenses directory. They are both under the ASL. srowen any thoughts?

cc yhuai

Author: Herman van Hovell <[email protected]>

Closes #10402 from hvanhovell/SPARK-8641-docs.
davies pushed a commit to davies/spark that referenced this pull request Mar 9, 2016
This PR removes Hive windows functions from Spark and replaces them with (native) Spark ones. The PR is on par with Hive in terms of features.

This has the following advantages:
* Better memory management.
* The ability to use spark UDAFs in Window functions.

cc rxin / yhuai

Author: Herman van Hovell <[email protected]>

Closes apache#9819 from hvanhovell/SPARK-8641-2.
davies pushed a commit to davies/spark that referenced this pull request Mar 9, 2016
This PR removes Hive windows functions from Spark and replaces them with (native) Spark ones. The PR is on par with Hive in terms of features.

This has the following advantages:
* Better memory management.
* The ability to use spark UDAFs in Window functions.

cc rxin / yhuai

Author: Herman van Hovell <[email protected]>

Closes apache#9819 from hvanhovell/SPARK-8641-2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants