Skip to content

Conversation

@aray
Copy link
Contributor

@aray aray commented Jul 31, 2015

This adds a pivot method to the dataframe api.

Following the lead of cube and rollup this adds a Pivot operator that is translated into an Aggregate by the analyzer.

Currently the syntax is like:
courseSales.pivot(Seq($"year"), $"course", Seq("dotNET", "Java"), sum($"earnings"))

Would we be interested in the following syntax also/alternatively? and

courseSales.groupBy($"year").pivot($"course", "dotNET", "Java").agg(sum($"earnings"))
//or
courseSales.groupBy($"year").pivot($"course").agg(sum($"earnings"))

Later we can add it to SQLParser, but as Hive doesn't support it we cant add it there, right?

Also what would be the suggested Java friendly method signature for this?

@rxin
Copy link
Contributor

rxin commented Aug 1, 2015

Jenkins, ok to test.

@aray
Copy link
Contributor Author

aray commented Aug 3, 2015

@rxin it looks like Jenkins forgot about building this. Can you help trigger the build again?

@SparkQA
Copy link

SparkQA commented Aug 3, 2015

Test build #1319 has finished for PR 7841 at commit 599e9e0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Pivot(

@rxin
Copy link
Contributor

rxin commented Aug 12, 2015

@aray FYI this didn't make it into the 1.5 release (was submitted too close to the feature freeze deadline), but we will try to include it in Spark 1.6.

Conflicts:
	sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@JoshRosen
Copy link
Contributor

@rxin, do you want to revisit this now for 1.6?

aray added 3 commits October 22, 2015 17:45
courseSales.groupBy($"year").pivot($"course", "dotNET", "Java").agg(sum($"earnings"))
Also, fixed master merge.
Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@aray
Copy link
Contributor Author

aray commented Oct 23, 2015

@rxin and @JoshRosen, this is ready for review now.

@rxin
Copy link
Contributor

rxin commented Oct 23, 2015

@aray Thanks a lot for updating this. To help api design, can you take a look at other frameworks and see what their signatures look like?

@aray
Copy link
Contributor Author

aray commented Oct 23, 2015

@rxin here is my summary of other frameworks API's

I'm going to use an example dataset form the pandas doc for all the examples (as df)

A B C D
foo one small 1
foo one large 2
foo one large 2
foo two small 3
foo two small 3
bar one large 4
bar one small 5
bar two small 6
bar two large 7

This API

scala> df.groupBy("A", "B").pivot("C", "small", "large").sum("D").show
+---+---+-----+-----+
|  A|  B|small|large|
+---+---+-----+-----+
|foo|two|    6| null|
|bar|two|    6|    7|
|foo|one|    1|    4|
|bar|one|    5|    4|
+---+---+-----+-----+

scala> df.groupBy("A", "B").pivot("C", "small", "large").agg(sum("D"), avg("D")).show
+---+---+------------+------------+------------+------------+
|  A|  B|small sum(D)|small avg(D)|large sum(D)|large avg(D)|
+---+---+------------+------------+------------+------------+
|foo|two|           6|         3.0|        null|        null|
|bar|two|           6|         6.0|           7|         7.0|
|foo|one|           1|         1.0|           4|         2.0|
|bar|one|           5|         5.0|           4|         4.0|
+---+---+------------+------------+------------+------------+

scala> df.pivot(Seq($"A", $"B"), $"C", Seq("small", "large"), sum($"D")).show
+---+---+-----+-----+
|  A|  B|small|large|
+---+---+-----+-----+
|foo|two|    6| null|
|bar|two|    6|    7|
|foo|one|    1|    4|
|bar|one|    5|    4|
+---+---+-----+-----+

We require a list of values for the pivot column as we are required to know the output columns of the operator ahead of time. Pandas and reshape2 do not require this but the comparable SQL operators do. We also allow multiple aggregations which not all implementations allow.

pandas

The comparable metod for pandas is pivot_table(data, values=None, index=None, columns=None, aggfunc='mean', fill_value=None, margins=False, dropna=True)

Example

>>> pivot_table(df, values='D', index=['A', 'B'], columns=['C'], aggfunc=np.sum)
          small  large
foo  one  1      4
     two  6      NaN
bar  one  5      4
     two  6      7

Pandas also allows multiple aggregations:

>>> pivot_table(df, values='D', index=['A', 'B'], columns=['C'], aggfunc=[np.sum, np.average])
          sum       average      
C       large small   large small
A   B                            
bar one     4     5       4     5
    two     7     6       7     6
foo one     4     1       2     1
    two   NaN     6     NaN     3

References

See also: pivot, stack, unstack.

reshape2 (R)

The comparable method for reshape2 is dcast(data, formula, fun.aggregate = NULL, ..., margins = NULL, subset = NULL, fill = NULL, drop = TRUE, value.var = guess_value(data))

> dcast(df, A + B ~ C, sum)
Using D as value column: use value.var to override.
    A   B large small
1 bar one     4     5
2 bar two     7     6
3 foo one     4     1
4 foo two     0     6

Note that by default cast fills with the value from applying fun.aggregate to 0 length vector

References

See also: melt.

MS SQL Server

SELECT *
FROM df
pivot (sum(D) for C in ([small], [large])) p

http://sqlfiddle.com/#!3/cf887/3/0

References

Oracle 11g

SELECT *
FROM df
pivot (sum(D) for C in ('small', 'large')) p

http://sqlfiddle.com/#!4/29bc5/3/0

Oracle also allows multiple aggregations and with similar output to this api

SELECT *
FROM df
pivot (sum(D) as sum, avg(D) as avg for C in ('small', 'large')) p

http://sqlfiddle.com/#!4/29bc5/5/0

References


Let me know if I can do anything else to help this along. Also would you mind adding me to the jenkins whitelist so I can test it?

@marmbrus
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #44249 has finished for PR 7841 at commit f2827ea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Pivot(\n

@rxin
Copy link
Contributor

rxin commented Oct 23, 2015

I like your 2nd interface more (group by and then pivot), since it is easier to get that working for both Java and Scala. We can implement a simpler interface for Python/R that's closer to existing frameworks.

How hard would it be to not require the values?

@aray
Copy link
Contributor Author

aray commented Oct 24, 2015

@rxin, Not requiring the values would necessitate doing a separate query for the distinct values of the column before the pivot query. It looks like at least some DF operations (eg, drop) would need the result so even if we made Pivot.output lazy we would be running an unexpected job.

If a user really didn't want to specify the values, they can explicitly do the query:

    df.groupBy("A", "B").pivot("C", df.select("C").distinct.collect.map(_.getString(0)): _*).sum("D")

Needing to know the output columns of an operator for analysis/planning is probably why the other SQL implementations require the values also (technically Oracle supports omitting it but only in XML mode where you essentially just get one column).

Merge branch 'master' of https://github.com/apache/spark into sql-pivot

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44643 has finished for PR 7841 at commit 403f966.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Pivot(\n

@rxin
Copy link
Contributor

rxin commented Nov 6, 2015

@aray sorry was away for spark summit - back now and will get to this today.

Merge branch 'master' of https://github.com/apache/spark into sql-pivot

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@SparkQA
Copy link

SparkQA commented Nov 8, 2015

Test build #45316 has finished for PR 7841 at commit d8e473c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Pivot(\n

@SparkQA
Copy link

SparkQA commented Nov 9, 2015

Test build #45366 has finished for PR 7841 at commit 2417548.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Pivot(\n

@rxin
Copy link
Contributor

rxin commented Nov 9, 2015

@aray I talked to a few more people about this. Most like the 2nd API more (groupBy.pivot.agg).

I think it'd also be better to remove the requirement to specify values, e.g. just take in a column without the values. So it looks like

courseSales.groupBy($"year").pivot($"course").agg(sum($"earnings"))

Can you update the pull request? Thanks.

@rxin
Copy link
Contributor

rxin commented Nov 9, 2015

BTW we can also later add a variant that allows users to specify values directly, in order to avoid materializing the intermediate data.

…ot provided. Add unit tests for this scenario.
@aray
Copy link
Contributor Author

aray commented Nov 9, 2015

@rxin Updated, the values are now optional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

@yhuai
Copy link
Contributor

yhuai commented Nov 11, 2015

@aray This is very cool! Here are a few things I'd like to discuss.

  1. Should we always ask users to provide pivot values?
  2. For the type of pivot values, should we just use String? Or, we should use Literal Column?
  3. For the column name of a pivot aggregation column, should we always show the aggregate function in a column name? (right now, we only show function name if we have more than 1 aggregations)

@SparkQA
Copy link

SparkQA commented Nov 11, 2015

Test build #45645 has finished for PR 7841 at commit 88dd513.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class SlidingRDDPartition[T](val idx: Int, val prev: Partition, val tail: Seq[T], val offset: Int)\n * class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int, val step: Int)\n * case class Pivot(\n

- Use Literal's for the pivot column values instead of strings.
- Change seperator when using multiple aggregates to `_` instead of space.
- Some additional unit testing
@aray
Copy link
Contributor Author

aray commented Nov 11, 2015

@yhuai RE your questions (3 was already addressed above):

  1. Should we always ask users to provide pivot values?

The argument for not requiring values I think is convenience and also similarity to other non-sql tools mentioned above. The negative is performance, but since we give them the option to specify I don't think that is a problem.

  1. For the type of pivot values, should we just use String? Or, we should use Literal Column?

I initially used strings as the type since that is the common usage scenario. But I agree that using Literal's is the better solution and will avoid casts which could hurt performance. For convenience I kept the second method (changed to pivot(pivotColumn: String, values: Any*)) which I think will be the prefered way to use pivot since it is more succinct.

I really appreciate the review. Let me know if I can do anything else to help!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we still need to check the number of children and make sure we have a single child?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should now work fine with aggregate functions that have multiple children as long as they ignore updates when all values are null. For example Corr should work since it only updates its aggregation buffer if both its arguments are non null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, yes. You are right.

@SparkQA
Copy link

SparkQA commented Nov 11, 2015

Test build #45659 has finished for PR 7841 at commit 12a8270.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Pivot(\n

@aray
Copy link
Contributor Author

aray commented Nov 11, 2015

@yhuai I think this addresses everything we discussed, let me know if I missed anything or if there is anything else I can do. Again, thanks for the code review.

@yhuai
Copy link
Contributor

yhuai commented Nov 11, 2015

LGTM pending jenkins.

@SparkQA
Copy link

SparkQA commented Nov 12, 2015

Test build #45673 has finished for PR 7841 at commit 676f1ac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Pivot(\n

@yhuai
Copy link
Contributor

yhuai commented Nov 12, 2015

Thanks! Merging to master and branch 1.6.

asfgit pushed a commit that referenced this pull request Nov 12, 2015
This adds a pivot method to the dataframe api.

Following the lead of cube and rollup this adds a Pivot operator that is translated into an Aggregate by the analyzer.

Currently the syntax is like:
~~courseSales.pivot(Seq($"year"), $"course", Seq("dotNET", "Java"), sum($"earnings"))~~

~~Would we be interested in the following syntax also/alternatively? and~~

    courseSales.groupBy($"year").pivot($"course", "dotNET", "Java").agg(sum($"earnings"))
    //or
    courseSales.groupBy($"year").pivot($"course").agg(sum($"earnings"))

Later we can add it to `SQLParser`, but as Hive doesn't support it we cant add it there, right?

~~Also what would be the suggested Java friendly method signature for this?~~

Author: Andrew Ray <[email protected]>

Closes #7841 from aray/sql-pivot.

(cherry picked from commit b8ff688)
Signed-off-by: Yin Huai <[email protected]>
@asfgit asfgit closed this in b8ff688 Nov 12, 2015
@rxin
Copy link
Contributor

rxin commented Nov 12, 2015

@aray do you want to submit a pull request for python api too?

@aray
Copy link
Contributor Author

aray commented Nov 12, 2015

@rxin sure I'll put together a PR for the python API tonight

dskrvk pushed a commit to dskrvk/spark that referenced this pull request Nov 13, 2015
This adds a pivot method to the dataframe api.

Following the lead of cube and rollup this adds a Pivot operator that is translated into an Aggregate by the analyzer.

Currently the syntax is like:
~~courseSales.pivot(Seq($"year"), $"course", Seq("dotNET", "Java"), sum($"earnings"))~~

~~Would we be interested in the following syntax also/alternatively? and~~

    courseSales.groupBy($"year").pivot($"course", "dotNET", "Java").agg(sum($"earnings"))
    //or
    courseSales.groupBy($"year").pivot($"course").agg(sum($"earnings"))

Later we can add it to `SQLParser`, but as Hive doesn't support it we cant add it there, right?

~~Also what would be the suggested Java friendly method signature for this?~~

Author: Andrew Ray <[email protected]>

Closes apache#7841 from aray/sql-pivot.
@rxin
Copy link
Contributor

rxin commented Jun 9, 2016

@aray this pull request was highlighted in http://www.slideshare.net/databricks/deep-dive-into-catalyst-apache-spark-20s-optimizer

@pushpam002
Copy link

thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants