Skip to content

Conversation

@koertkuipers
Copy link
Contributor

What changes were proposed in this pull request?

Add mapValues to KeyValueGroupedDataset

How was this patch tested?

New test in DatasetSuite for groupBy function, mapValues, flatMap

@SparkQA
Copy link

SparkQA commented Jun 6, 2016

Test build #60054 has finished for PR 13526 at commit 3494ec5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* data. The grouping key is unchanged by this.
*
* {{{
* // Create values grouped by key from a Dataset[(K, V)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the example is wrong here. can we use a proper java 8 example?

Copy link
Contributor Author

@koertkuipers koertkuipers Jun 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh.. it should be groupByKey, not groupBy. woops
i will also comment that its scala i guess

and put a proper java example in the java api

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my comment was targetting the wrong method. I was referring to the one below should come with a java 8 example.

@rxin
Copy link
Contributor

rxin commented Jun 6, 2016

cc @cloud-fan too

@cloud-fan
Copy link
Contributor

cloud-fan commented Jun 7, 2016

I doubt if this feature is really useful? I think users can easily call map on the values during mapGroups, e.g.

val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
ds.groupByKey(_._1).mapGroups { case (k, vs) => (k, vs.map(_._2).sum) }

On the other way, this implemetaion is kind of efficient, everytime we call mapValues, we append new columns to the underlying plan.

If we do want to add this feature, we should add optimizer rules for this case. But it's not trivial, and may not worth such a rare case.

@SparkQA
Copy link

SparkQA commented Jun 7, 2016

Test build #60082 has finished for PR 13526 at commit 52b19c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 7, 2016

Test build #60080 has finished for PR 13526 at commit 315f8ce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koertkuipers
Copy link
Contributor Author

koertkuipers commented Jun 7, 2016

see this conversation:
https://mail-archives.apache.org/mod_mbox/spark-user/201602.mbox/%3CCAAswR-7KQFMxd_Cpr-_WdyGAfh+RAreCM9OLm5jKXFk14FcHGg@mail.gmail.com%3E

mapGroups is not a very interesting API, since without support for secondary sort and hence no need for fold operations pushing all the value into the reducer never really makes sense. so the interesting APIs are reduceGroups (when its fixed to be efficient and not use mapGroups) and agg.
how do you transform the values before they go into reduceGroups? you can not do this currently, which is why we need something like mapValues. with Aggregators you can indeed do something similar inside the Aggregator (since the input type is not equal to the buffer type), but this leads to all Aggregators currently taking in some kind of input transform function, which hints at a suboptimal API and a pattern that should be generalized and extracted.

i am curious to know why appending a column is inefficient? especially when it never materializes? i am open to different designs

about this being a rare case: i would argue the opposite. i expect to see a lot of key-value datasets (Dataset[(K, V)]) in our code-base, and on those a lot of operations like ds.groupByKey(_._1).mapValues.(_._2).reduceGroups(...).
since this is the most natural translation of many RDD algos.

@koertkuipers
Copy link
Contributor Author

koertkuipers commented Jun 7, 2016

can you explain a bit what is inefficient and would need an optimizer rule?
is it the key being appended, and then the new values being appended? so is the challenge to make the 2 appends into one append?
thanks!

@cloud-fan
Copy link
Contributor

OK now I agree this is a useful API.

For performance, I would expect that ds.groupByKey(_._1).mapValues(_._2).mapGroups { case (k, vs) => (k, vs.sum) } should be at least as fast as ds.groupByKey(_._1).mapGroups { case (k, vs) => (k, vs.map(_._2).sum) }. But the current implementation looks not?

I'll take a closer look tomorrow, and let's discuss what's the best way to do it.

@koertkuipers
Copy link
Contributor Author

ok i will study the physical plans for both and try to understand why one would be slower

@koertkuipers
Copy link
Contributor Author

scala> val x = Seq(("a", 1), ("b", 2)).toDS
x: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]

scala> x.groupByKey(_._1).mapValues(_._2).reduceGroups(_ + _).explain
== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, scala.Tuple2, true]._1, true) AS value#36, input[0, scala.Tuple2, true]._2 AS value#37]
+- MapGroups <function2>, value#32.toString, value#34: int, [value#32], [value#34], obj#35: scala.Tuple2
   +- *Sort [value#32 ASC], false, 0
      +- Exchange hashpartitioning(value#32, 200)
         +- *Project [value#34, value#32]
            +- AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, int, true] AS value#34]
               +- AppendColumns <function1>, newInstance(class scala.Tuple2), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#32]
                  +- LocalTableScan [_1#28, _2#29]

it seems to AppendColumns are not collapsed

@cloud-fan
Copy link
Contributor

A possible approach maybe just keep the function given by mapValues, and apply it before calling the function given by mapGroups. By doing this, we at least won't make the performance worse, as the underlying plan doesn't change.

@koertkuipers
Copy link
Contributor Author

koertkuipers commented Jun 8, 2016

the tricky part with that is that (ds: Dataset[(K, V)]).groupBy(_.1).mapValues(._2) should return a
KeyValueGroupedDataset[K, V]

@koertkuipers
Copy link
Contributor Author

koertkuipers commented Jun 8, 2016

could we "rewind"/undo the append for the key and change it to a map that creates new data values and key? so remove one append and replace it with another operation?

child)
}

def apply[T : Encoder, U : Encoder](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you use T : Encoder, i.e. with spaces before and after : while...

@SparkQA
Copy link

SparkQA commented Jul 15, 2016

Test build #62388 has finished for PR 13526 at commit a2bef60.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Oct 18, 2016

cc @cloud-fan

This looks good to me -- but I don't remember why we didn't merge it earlier already.

@cloud-fan
Copy link
Contributor

it lacks an optimizer rule to collapse AppendColumns, but seems ok to merge it first and add the rule in follow-up.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Oct 20, 2016

Test build #67264 has finished for PR 13526 at commit a2bef60.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 20, 2016

Test build #67268 has finished for PR 13526 at commit 1bdc254.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Oct 20, 2016

Alright merging in master. Thanks.

@koertkuipers would you be able to add the optimizer rule?

@asfgit asfgit closed this in 84b245f Oct 20, 2016
@koertkuipers
Copy link
Contributor Author

koertkuipers commented Oct 20, 2016

@rxin i can give it a try (the optimizer rule)

looking at it, currently what happens under the hood with AppendColumns[T, U](func: T => U, ...) is something like this:

convert UnsafeRow to T
apply func to T to generate U
convert U to UnsafeRow
append the new UnsafeRow to the original one

how do we intend to optimize chaining two AppendColumns with functions func1: T => U and func2: T => W? catching the two AppendColumns happening after each other is easy, but turning it into a single AppendColumns with say a combined function T => (U, W) won't work.

optimized would look something like this under the hood:

convert UnsafeRow to T
apply func1 to T to generate U
convert U to UnsafeRow
apply func2 to T to generate  W
convert W to UnsafeRow
append the two new UnsafeRows to the original one.

this would require some serious refactoring, AppendColumnsExec can not facilitate such a flow currently i think

@cloud-fan
Copy link
Contributor

To optimize ds.groupBy(...).mapValues(...), yea it's not trivial as you explained above. But for grouped.mapValues(...).mapValues(...), I think it should not be that hard, as it's a pattern like two AppendColumns with functions func1: T => U and func2: U => W, if we treat Project(AppendColumns, ...) as one operator.

@koertkuipers
Copy link
Contributor Author

@cloud-fan i can try to optimize grouped.mapValues(...).mapValues(...) but its a bit of an anti-pattern (there should be no need to do mapValues twice) so i dont think there is much gain in optimizing this. what do you think?

@cloud-fan
Copy link
Contributor

That's a good point, let's focus on ds.groupBy(...).mapValues(...) then. One thought, in mapValues, we will project away the previous value attributes, so the workflow should be:

convert UnsafeRow to T
apply func1 to T to generate U
convert U to UnsafeRow
apply func2 to T to generate  W
convert W to UnsafeRow
join the two new UnsafeRows, without the original row.

@koertkuipers
Copy link
Contributor Author

@cloud-fan that makes sense to me, but its definitely not a quick win to create that optimization.
let me think about it some more

@koertkuipers
Copy link
Contributor Author

if they chain like that then i think i know how to do the optimization.

but do they? look for example at dataset.groupByKey(...).mapValues(...)

Dataset[T].groupByKey[K] uses function T => K and creates
KeyValueGroupedDataset[K, T]

KeyValueGroupedDataset[K, T].mapValues[W] uses function T => W and creates
KeyValueGroupedDataset[K, W]

so i have T => K and then T => W

On Thu, Oct 20, 2016 at 8:26 PM, Wenchen Fan [email protected]
wrote:

2 chained AppendColumns will have 2 functions: T => U and U => W, so we
can combine them this way:
convert UnsafeRow to T
apply func to T to generate U
apply func to U to generate W
convert W to UnsafeRow
append the new UnsafeRow to the original one


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#13526 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAyIJL19KTgysYd5dRAsDtIseY0jwlm3ks5q2AbLgaJpZM4IvEGP
.

robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
## What changes were proposed in this pull request?

Add mapValues to KeyValueGroupedDataset

## How was this patch tested?

New test in DatasetSuite for groupBy function, mapValues, flatMap

Author: Koert Kuipers <[email protected]>

Closes apache#13526 from koertkuipers/feat-keyvaluegroupeddataset-mapvalues.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

Add mapValues to KeyValueGroupedDataset

## How was this patch tested?

New test in DatasetSuite for groupBy function, mapValues, flatMap

Author: Koert Kuipers <[email protected]>

Closes apache#13526 from koertkuipers/feat-keyvaluegroupeddataset-mapvalues.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants