Skip to content

Conversation

@jtengyp
Copy link

@jtengyp jtengyp commented May 8, 2017

In compute, group each iterator to multiple groups, reducing repeatedly data fetching.

What changes were proposed in this pull request?

In compute, group each iterator to multiple groups. Thus in the second iteration, the data with be fetched (num of data)/groupSize times, rather than (num of data) times.

How was this patch tested?

The existing UT.

In compute, group each iterator to multiple groups, reducing repeatedly data fetching.
@srowen
Copy link
Member

srowen commented May 8, 2017

Please read http://spark.apache.org/contributing.html
For example, "Update X" is never sufficient as a title.
How does this avoid fetching? how much difference does it make?

@jtengyp jtengyp changed the title Update CartesianRDD.scala Optimize the CartesianRDD May 8, 2017
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
val groupSize = 500;
for (x <- rdd1.iterator(currSplit.s1, context).grouped(groupSize);
y <- rdd2.iterator(currSplit.s2, context).grouped(groupSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One disadvantage I can think now is, longer waiting time for first element.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed a disadvantage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon, doesn't this change the type of the result? you're iterating over groupings not elements, and emitting pairs of groups. As in below, but maybe I'm missing something.

scala> val foo = List(1,2,3)
foo: List[Int] = List(1, 2, 3)

scala> val bar = List(4,5,6)
bar: List[Int] = List(4, 5, 6)

scala> for (x <- foo; y <- bar) yield (x, y)
res0: List[(Int, Int)] = List((1,4), (1,5), (1,6), (2,4), (2,5), (2,6), (3,4), (3,5), (3,6))

scala> (for (x <- foo.grouped(2); y <- bar.grouped(2)) yield (x, y)).foreach(println)
(List(1, 2),List(4, 5))
(List(1, 2),List(6))
(List(3),List(4, 5))
(List(3),List(6))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual yield is on (i, j) and not (x, y) - the next line adds the iteration over the groupings :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @viirya - there is also an implicit assumption of size here : the batch will get deserialized into memory.
By default, we have kept the iterator model going in spark without needing to buffer (iirc).

Copy link
Contributor

@ConeyLiu ConeyLiu May 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I working on this too. But the optimize method maybe similar to the pr which @viirya opened before, cache the second iterator into local. The code is ready, maybe open a pr in recently. In this patch, I worry about whether we can accurately control the size of the buffer. If we should cache it by BlockManager or MemoryConsumer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh haha right. Hm, but isn't this better solved 'upstream' by buffering an iterator somewhere? or just buffering the iterator right here?

@viirya
Copy link
Member

viirya commented May 8, 2017

Maybe create a JIRA and update title as Spark PR convention. Since this should be a performance improvement, the difference is expected to show.

@jtengyp jtengyp changed the title Optimize the CartesianRDD Optimize the CartesianRDD to reduce repeatedly data fetching May 8, 2017
@jtengyp
Copy link
Author

jtengyp commented May 8, 2017

Here is my test:
Environment : 3 workers, each has 10 cores, 30G memory, 1 executor
Test data : users : 480,189, each is a 10-dim vector, and items : 17770, each is a 10-dim vector.
With default CartesianRDD, cartesian time is 2420.7s.
With this proposal, cartesian time is 45.3s
50x faster than the original method.

val groupSize = 1000
for (x <- rdd1.iterator(currSplit.s1, context).grouped(groupSize);
y <- rdd2.iterator(currSplit.s2, context);
i <- x) yield (i,y)
I found the above change gains better performance, just buffering the first RDD.
With this PR, the cartesian time was 41.05s, and with this change it drops to 27.35s, 88x faster than the original method.

@SparkQA
Copy link

SparkQA commented May 8, 2017

Test build #3697 has finished for PR 17898 at commit d2cbcdd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jtengyp jtengyp changed the title Optimize the CartesianRDD to reduce repeatedly data fetching [SPARK-20638][Core]Optimize the CartesianRDD to reduce repeatedly data fetching May 9, 2017
@srowen
Copy link
Member

srowen commented May 10, 2017

@jtengyp I think we won't proceed with this version, so this can be closed, but see the discussion at #17936

@jtengyp jtengyp closed this May 15, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants