Skip to content

Conversation

@douglaz
Copy link
Contributor

@douglaz douglaz commented May 18, 2014

Adds cogroup for 4 RDDs.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@rxin
Copy link
Contributor

rxin commented May 19, 2014

Thanks for submitting this. Instead of allowing 4 (and maybe 5), users can certainly use the cogroup RDD's constructor to construct cogroups of arbitrary RDDs. If that is inconvenient, perhaps we should think about a cogroup interface that either takes varargs, or just a sequence/list of RDDs?

@douglaz
Copy link
Contributor Author

douglaz commented May 20, 2014

Yes, the user can instantiate the RDD and yes this is inconvenient. An interface to do this would be no less inconvenient if it has the same drawbacks (that you need to explicitly convert back the resulting sequences to the original type).

Limiting the user to 3 cogroups is pretty much like limiting tuples to 3 elements. You may have technical reasons for that limit, but it isn't reasonable for practical purposes. You can't just say: if you need a tuple with more than 3 elements, use lists instead.

For tuples the current limit is 22, which is "enough for everyone". For cogroups the limit should be lower, but certainly above 3.

@pwendell
Copy link
Contributor

Isn't it possible to just to new CoGroupedRDD(Seq(rdd1, rdd2, rdd3... rddn))? That seems like the same number of lines of code as rdd1.cogroup(rdd2, rdd3...rddn)).

We have many functions like this, including union - I'm not sure we want to create many definitions of each of these functions.

@douglaz
Copy link
Contributor Author

douglaz commented May 20, 2014

It isn't just about lines of code, it is about pollution of code using asInstanceOf and runtime errors because of this and wrong pattern matching on Sequences.

Compare this almost-real-code using cogroup:

val userHistories = parsedViews.cogroup(parsedBuyOrders, parsedShoppingCarts, parsedSentMails, partitioner=context.partitioner)
  .map(values => {
    val (key, events) = values
    val (groupedViews, groupedBuyOrders, groupedShoppingCarts, groupedSentMails) = events

    val sentMailsProducts = groupedSentMails.flatMap(_.products)

    val validViews = groupedViews.filter(v => !sentMailsProducts.contains(v.productId))

    key -> UserHistory(validViews, groupedBuyOrders, groupedShoppingCarts, groupedSentMails)
  })

With this using CoGroupedRDD:

// Perhaps there is some mistake here, a RDD may be missing
val userHistories = new CoGroupedRDD(Seq(parsedViews, parsedBuyOrders, parsedShoppingCarts, parsedSentMails), part=context.partitioner)
  .map(values => {
    val (key, events) = values

    // Or the match is wrong here
    val Seq(_groupedViews, _groupedBuyOrders, _groupedShoppingCarts, _groupedSentMails) = events

    // Or here we are casting with the wrong type. We'll find out at runtime
    val groupedViews = _groupedViews.asInstanceOf[Seq[UHView]]
    val groupedBuyOrders = _groupedBuyOrders.asInstanceOf[Seq[UHBuyOrder]]
    val groupedShoppingCarts = _groupedShoppingCarts.asInstanceOf[Seq[UHShoppingCartLog]]
    val groupedSentMails = _groupedSentMails.asInstanceOf[Seq[UHSentMail]]

    val sentMailsProducts = groupedSentMails.flatMap(_.products)

    val validViews = groupedViews.filter(v => !sentMailsProducts.contains(v.productId))

    key -> UserHistory(validViews, groupedBuyOrders, groupedShoppingCarts, groupedSentMails)
  })

The second example is clearly more verbose and error-prone.

Comparing cogroup with union misses the point:

  • cogroup may be called using different types and it keeps them thanks to the tuple signature. With union there is just one RDD type.
  • rdd1.union(rdd2).union(rdd3) works very well and is transparent to the user of the resulting RDD, while rdd1.cogroup(rdd2).cogroup(rdd3) will be very different from rdd1.cogroup(rdd2, rdd3). The composition works fine for union but for cogroup we start to get Seq[Seq[ and of course we may have performance implications.

A more fair comparison would be with join because it also keeps different types and the composition will create tuple of tuples. But in this case I find it very easy and safe to unpack such tuples. It isn't ideal but better than cogroup in the same situation. Of course I wouldn't oppose to create a interface for joins with more elements.

But I agree that we should really discuss this. If such operations won't get in main Spark, then external libraries (using implicits) will be created to handle such cases. I think it would be better if Spark could handle such cases without letting the user deal with boilerplate or resorting to external libraries.

@markhamstra
Copy link
Contributor

To throw another wrench into the Union analogy, there is also the little-used SparkContext#union, which has signatures for both Seq[RDD[T]] and varags RDD[T].

@pwendell
Copy link
Contributor

Hey @douglaz thanks for giving the explanation. This makes a lot of sense... the issue is about compile time type checking because the varargs drops the value type (didn't realize). This will need to exist somewhere, I think it could be something to merge into Spark core or maybe could exist in user libraries. Let me ask around the committers a bit and try to get a consensus.

@mateiz
Copy link
Contributor

mateiz commented May 29, 2014

I'd be okay adding this, but it can be a bit of a slippery slope because people may then want it for joins, etc as well. But maybe we can just limit it to cogroup right now.

Regarding the pull request though, we should add this API to Python as well. Can you look into what that will take?

@douglaz
Copy link
Contributor Author

douglaz commented May 29, 2014

I'll take a look at the python interface soon.

@douglaz
Copy link
Contributor Author

douglaz commented Jun 5, 2014

Jenkins, test this please.

@douglaz
Copy link
Contributor Author

douglaz commented Jun 5, 2014

I'm having no luck running the python tests on my machine. I'll try again later.

@pwendell
Copy link
Contributor

pwendell commented Jun 5, 2014

Jenkins, test this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15473/

@pwendell
Copy link
Contributor

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15655/

@douglaz
Copy link
Contributor Author

douglaz commented Jun 15, 2014

The tests should pass now.

@pwendell
Copy link
Contributor

Jenkins, retest this please.

@AmplabJenkins
Copy link

Build triggered.

@AmplabJenkins
Copy link

Build started.

@AmplabJenkins
Copy link

Build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15811/

@pwendell
Copy link
Contributor

@douglaz if you up-merge this with master I think the tests should pass fine (currently it's not merging cleanly). I'd like to get this merged soon if possible, so let me know! Thanks

@douglaz
Copy link
Contributor Author

douglaz commented Jun 18, 2014

@pwendell, merged with latest master.

@pwendell
Copy link
Contributor

Jenkins, test this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this break compatibility for users who were building against the previous API? It seems like this is a public API, so we might need to make a second version rather than replace the current one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I looked yet again, this entire file is not exposed in e.g. the docs, so I guess this isn't public.

@mateiz
Copy link
Contributor

mateiz commented Jun 18, 2014

Hey @douglaz, thanks for updating this. One thing missing here is tests in each of the languages -- please add them so that this code will be tested later.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15866/

@douglaz
Copy link
Contributor Author

douglaz commented Jun 19, 2014

@pwendell, @mateiz, check if everything is fine now.

@pwendell
Copy link
Contributor

Jenkins, test this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15953/

@pwendell
Copy link
Contributor

Looks good - thanks for this. I'm going to merge it.

@asfgit asfgit closed this in 6a224c3 Jun 20, 2014
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
Adds cogroup for 4 RDDs.

Author: Allan Douglas R. de Oliveira <[email protected]>

Closes apache#813 from douglaz/more_cogroups and squashes the following commits:

f8d6273 [Allan Douglas R. de Oliveira] Test python groupWith for one more case
0e9009c [Allan Douglas R. de Oliveira] Added scala tests
c3ffcdd [Allan Douglas R. de Oliveira] Added java tests
517a67f [Allan Douglas R. de Oliveira] Added tests for python groupWith
2f402d5 [Allan Douglas R. de Oliveira] Removed TODO
17474f4 [Allan Douglas R. de Oliveira] Use new cogroup function
7877a2a [Allan Douglas R. de Oliveira] Fixed code
ba02414 [Allan Douglas R. de Oliveira] Added varargs cogroup to pyspark
c4a8a51 [Allan Douglas R. de Oliveira] Added java cogroup 4
e94963c [Allan Douglas R. de Oliveira] Fixed spacing
f1ee57b [Allan Douglas R. de Oliveira] Fixed scala style issues
d7196f1 [Allan Douglas R. de Oliveira] Allow the cogroup of 4 RDDs
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
Adds cogroup for 4 RDDs.

Author: Allan Douglas R. de Oliveira <[email protected]>

Closes apache#813 from douglaz/more_cogroups and squashes the following commits:

f8d6273 [Allan Douglas R. de Oliveira] Test python groupWith for one more case
0e9009c [Allan Douglas R. de Oliveira] Added scala tests
c3ffcdd [Allan Douglas R. de Oliveira] Added java tests
517a67f [Allan Douglas R. de Oliveira] Added tests for python groupWith
2f402d5 [Allan Douglas R. de Oliveira] Removed TODO
17474f4 [Allan Douglas R. de Oliveira] Use new cogroup function
7877a2a [Allan Douglas R. de Oliveira] Fixed code
ba02414 [Allan Douglas R. de Oliveira] Added varargs cogroup to pyspark
c4a8a51 [Allan Douglas R. de Oliveira] Added java cogroup 4
e94963c [Allan Douglas R. de Oliveira] Fixed spacing
f1ee57b [Allan Douglas R. de Oliveira] Fixed scala style issues
d7196f1 [Allan Douglas R. de Oliveira] Allow the cogroup of 4 RDDs
turboFei added a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…e log pattern (apache#813)

1. update log4j2 pattern to show abbreviation for package names
<img width="1031" alt="image" src="https://github.corp.ebay.com/carmel/ebay-spark/assets/27990/9526784e-d9a1-4a3b-9d51-c7fb60d90144">

2. CELEBORN-2083: Suppress noisy worker status changed from application heartbeat response
3. upgrade celeborn client version
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants