Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
687ffbc
This is the it compiles point of replacing Seq with Iterator and JLis…
holdenk Mar 25, 2014
be50246
Calling size on an iterator is not so good if we want to use it after
holdenk Mar 25, 2014
4991af6
Fix some tests
holdenk Mar 25, 2014
88b3329
Fix groupbykey to actually give back an iterator
holdenk Mar 25, 2014
caafa63
I think this might be a bad rabbit hole. Started work to make CoGroup…
holdenk Mar 26, 2014
172705c
Fix Java API suite
holdenk Mar 26, 2014
fe992fe
hmmm try and fix up basic operation suite
holdenk Mar 26, 2014
6698186
Revert "I think this might be a bad rabbit hole. Started work to make…
holdenk Mar 26, 2014
249abde
org.apache.spark.rdd.PairRDDFunctionsSuite passes
holdenk Mar 26, 2014
37888ec
core/tests now pass
holdenk Mar 26, 2014
b1ae51a
Fix some of the types in the streaming JavaAPI suite. Probably still …
holdenk Mar 26, 2014
71e8b9f
I really need to stop calling size on iterators, it is the path of sa…
holdenk Mar 26, 2014
b4e0b1d
Fix style issues
holdenk Mar 26, 2014
c4510e2
Revert this but for now put things in list pandas
holdenk Mar 26, 2014
b4e86a9
Add a join based on the problem in SVD
holdenk Mar 26, 2014
8a5153a
Revert me, but we have some stuff to debug
holdenk Mar 26, 2014
7e533f7
Fix the bug
holdenk Mar 26, 2014
b692868
Revert
holdenk Mar 26, 2014
ec99e32
Revert "Revert this but for now put things in list pandas"
holdenk Mar 26, 2014
fa395c9
Revert "Add a join based on the problem in SVD"
holdenk Mar 26, 2014
4b0eeb9
Switch cast in PairDStreamFunctions
holdenk Mar 26, 2014
ec8cc3e
Fix test issues\!
holdenk Mar 27, 2014
e687f21
Fix groupbykey test in JavaAPISuite of streaming
holdenk Mar 27, 2014
a5ee714
oops, was checking wrong iterator
holdenk Mar 27, 2014
88a5cef
Fix cogroup test in JavaAPISuite for streaming
holdenk Mar 27, 2014
c60233a
Start investigating moving to iterators for python API like the Java/…
holdenk Mar 27, 2014
cd1e81c
Try and make pickling list iterators work
holdenk Mar 28, 2014
3bcd81d
Revert "Try and make pickling list iterators work"
holdenk Mar 28, 2014
d052c07
Python tests now pass with iterator pandas
holdenk Mar 28, 2014
4ed579b
Refactor from iterator to iterable
holdenk Apr 7, 2014
66b583d
Fix the core test suite to compile
holdenk Apr 7, 2014
11e730c
Fix streaming tests
holdenk Apr 8, 2014
2d06e10
Fix Java 8 cogroup tests for the new API
holdenk Apr 8, 2014
c5075aa
If guava 14 had iterables
holdenk Apr 8, 2014
eb06216
maybe I should have had a coffee first. use correct import for guava …
holdenk Apr 8, 2014
7a092a3
switch resultitr to resultiterable
holdenk Apr 8, 2014
d3fe909
use toSeq instead
holdenk Apr 8, 2014
77048f8
Fix merge up to master
holdenk Apr 8, 2014
f289536
Fix bad merge, should have been Iterable rather than Iterator
holdenk Apr 8, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,27 +220,31 @@ object Bagel extends Logging {
*/
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
sc: SparkContext,
grouped: RDD[(K, (Seq[C], Seq[V]))],
grouped: RDD[(K, (Iterable[C], Iterable[V]))],
compute: (V, Option[C]) => (V, Array[M]),
storageLevel: StorageLevel
): (RDD[(K, (V, Array[M]))], Int, Int) = {
var numMsgs = sc.accumulator(0)
var numActiveVerts = sc.accumulator(0)
val processed = grouped.flatMapValues {
case (_, vs) if vs.size == 0 => None
case (c, vs) =>
val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator))
.flatMapValues {
case (_, vs) if !vs.hasNext => None
case (c, vs) => {
val (newVert, newMsgs) =
compute(vs(0), c match {
case Seq(comb) => Some(comb)
case Seq() => None
})
compute(vs.next,
c.hasNext match {
case true => Some(c.next)
case false => None
}
)

numMsgs += newMsgs.size
if (newVert.active) {
numActiveVerts += 1
}

Some((newVert, newMsgs))
}
}.persist(storageLevel)

// Force evaluation of processed RDD for accurate performance measurements
Expand Down
36 changes: 19 additions & 17 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.api.java

import java.util.{Comparator, List => JList}
import java.lang.{Iterable => JIterable}

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -250,14 +251,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*/
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JList[V]] =
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] =
def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))

/**
Expand Down Expand Up @@ -367,7 +368,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*/
def groupByKey(): JavaPairRDD[K, JList[V]] =
def groupByKey(): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey()))

/**
Expand Down Expand Up @@ -462,55 +463,55 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (JList[V], JList[W])] =
: JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2],
partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other)))

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int)
: JavaPairRDD[K, (JList[V], JList[W])] =
: JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))

/** Alias for cogroup. */
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.groupWith(other)))

/** Alias for cogroup. */
def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
: JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))

/**
Expand Down Expand Up @@ -695,21 +696,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

object JavaPairRDD {
private[spark]
def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Seq[T])]): RDD[(K, JList[T])] = {
rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList)
def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, JIterable[T])] = {
rddToPairRDDFunctions(rdd).mapValues(asJavaIterable)
}

private[spark]
def cogroupResultToJava[K: ClassTag, V, W](
rdd: RDD[(K, (Seq[V], Seq[W]))]): RDD[(K, (JList[V], JList[W]))] = {
rddToPairRDDFunctions(rdd).mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], JIterable[W]))] = {
rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2)))
}

private[spark]
def cogroupResult2ToJava[K: ClassTag, V, W1, W2](
rdd: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))]): RDD[(K, (JList[V], JList[W1], JList[W2]))] = {
rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))])
: RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = {
rddToPairRDDFunctions(rdd)
.mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2), seqAsJavaList(x._3)))
.mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3)))
}

def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.api.java

import java.util.{Comparator, Iterator => JIterator, List => JList}
import java.util.{Comparator, List => JList, Iterator => JIterator}
import java.lang.{Iterable => JIterable}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -204,7 +204,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
Expand All @@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
Expand Down
39 changes: 21 additions & 18 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
Expand All @@ -270,14 +270,14 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
bufs.asInstanceOf[RDD[(K, Seq[V])]]
bufs.mapValues(_.toIterable)
}

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = {
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = {
groupByKey(new HashPartitioner(numPartitions))
}

Expand All @@ -298,7 +298,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
for (v <- vs; w <- ws) yield (v, w)
}
}

Expand All @@ -311,9 +311,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (ws.isEmpty) {
vs.iterator.map(v => (v, None))
vs.map(v => (v, None))
} else {
for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
for (v <- vs; w <- ws) yield (v, Some(w))
}
}
}
Expand All @@ -328,9 +328,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (vs.isEmpty) {
ws.iterator.map(w => (None, w))
ws.map(w => (None, w))
} else {
for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
for (v <- vs; w <- ws) yield (Some(v), w)
}
}
}
Expand Down Expand Up @@ -358,7 +358,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*/
def groupByKey(): RDD[(K, Seq[V])] = {
def groupByKey(): RDD[(K, Iterable[V])] = {
groupByKey(defaultPartitioner(self))
}

Expand Down Expand Up @@ -453,7 +453,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
Expand All @@ -468,21 +469,23 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
(vs.asInstanceOf[Seq[V]],
w1s.asInstanceOf[Seq[W1]],
w2s.asInstanceOf[Seq[W2]])
}
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}

Expand All @@ -491,15 +494,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, new HashPartitioner(numPartitions))
}

Expand All @@ -508,18 +511,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
cogroup(other1, other2, new HashPartitioner(numPartitions))
}

/** Alias for cogroup. */
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}

/** Alias for cogroup. */
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -438,20 +438,20 @@ abstract class RDD[T: ClassTag](
/**
* Return an RDD of grouped items.
*/
def groupBy[K: ClassTag](f: T => K): RDD[(K, Seq[T])] =
def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] =
groupBy[K](f, defaultPartitioner(this))

/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] =
groupBy(f, new HashPartitioner(numPartitions))

/**
* Return an RDD of grouped items.
*/
def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] = {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
Expand Down
Loading