Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 39 additions & 32 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val doubleRing = ring ++ ring
val graph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1)
assert(graph.edges.count() === doubleRing.size)
assert(graph.edges.collect.forall(e => e.attr == 1))
assert(graph.edges.collect().forall(e => e.attr == 1))

// uniqueEdges option should uniquify edges and store duplicate count in edge attributes
val uniqueGraph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1, Some(RandomVertexCut))
assert(uniqueGraph.edges.count() === ring.size)
assert(uniqueGraph.edges.collect.forall(e => e.attr == 2))
assert(uniqueGraph.edges.collect().forall(e => e.attr == 2))
}
}

Expand All @@ -64,7 +64,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert( graph.edges.count() === rawEdges.size )
// Vertices not explicitly provided but referenced by edges should be created automatically
assert( graph.vertices.count() === 100)
graph.triplets.collect.map { et =>
graph.triplets.collect().map { et =>
assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr))
assert((et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr))
}
Expand All @@ -75,15 +75,17 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
(1 to n).map(x => (0: VertexId, x: VertexId, "v", "v")).toSet)
assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect().toSet
=== (1 to n).map(x => (0: VertexId, x: VertexId, "v", "v")).toSet)
}
}

test("partitionBy") {
withSpark { sc =>
def mkGraph(edges: List[(Long, Long)]) = Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0)
def nonemptyParts(graph: Graph[Int, Int]) = {
def mkGraph(edges: List[(Long, Long)]): Graph[Int, Int] = {
Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0)
}
def nonemptyParts(graph: Graph[Int, Int]): RDD[List[Edge[Int]]] = {
graph.edges.partitionsRDD.mapPartitions { iter =>
Iterator(iter.next()._2.iterator.toList)
}.filter(_.nonEmpty)
Expand All @@ -102,7 +104,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(nonemptyParts(mkGraph(sameSrcEdges).partitionBy(EdgePartition1D)).count === 1)
// partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into
// the same partition
assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)
assert(
nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)
// partitionBy(EdgePartition2D) puts identical edges in the same partition
assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1)

Expand Down Expand Up @@ -140,10 +143,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val g = Graph(
sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
assert(g.triplets.collect.map(_.toTuple).toSet ===
assert(g.triplets.collect().map(_.toTuple).toSet ===
Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
val gPart = g.partitionBy(EdgePartition2D)
assert(gPart.triplets.collect.map(_.toTuple).toSet ===
assert(gPart.triplets.collect().map(_.toTuple).toSet ===
Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
}
}
Expand All @@ -154,10 +157,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val star = starGraph(sc, n)
// mapVertices preserving type
val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
assert(mappedVAttrs.vertices.collect().toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
// mapVertices changing type
val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length)
assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, 1)).toSet)
assert(mappedVAttrs2.vertices.collect().toSet === (0 to n).map(x => (x: VertexId, 1)).toSet)
}
}

Expand All @@ -177,12 +180,12 @@ class GraphSuite extends FunSuite with LocalSparkContext {
// Trigger initial vertex replication
graph0.triplets.foreach(x => {})
// Change type of replicated vertices, but preserve erased type
val graph1 = graph0.mapVertices {
case (vid, integerOpt) => integerOpt.map((x: java.lang.Integer) => (x.toDouble): java.lang.Double)
val graph1 = graph0.mapVertices { case (vid, integerOpt) =>
integerOpt.map((x: java.lang.Integer) => x.toDouble: java.lang.Double)
}
// Access replicated vertices, exposing the erased type
val graph2 = graph1.mapTriplets(t => t.srcAttr.get)
assert(graph2.edges.map(_.attr).collect.toSet === Set[java.lang.Double](1.0, 2.0, 3.0))
assert(graph2.edges.map(_.attr).collect().toSet === Set[java.lang.Double](1.0, 2.0, 3.0))
}
}

Expand All @@ -202,7 +205,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect.toSet ===
assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect().toSet ===
(1L to n).map(x => Edge(0, x, "vv")).toSet)
}
}
Expand All @@ -211,7 +214,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexId, 1)).toSet)
assert(star.reverse.outDegrees.collect().toSet === (1 to n).map(x => (x: VertexId, 1)).toSet)
}
}

Expand All @@ -221,7 +224,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
val graph = Graph(vertices, edges).reverse
val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(result.collect.toSet === Set((1L, 2)))
assert(result.collect().toSet === Set((1L, 2)))
}
}

Expand All @@ -237,7 +240,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(subgraph.vertices.collect().toSet === (0 to n by 2).map(x => (x, "v")).toSet)

// And 4 edges.
assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
assert(subgraph.edges.map(_.copy()).collect().toSet ===
(2 to n by 2).map(x => Edge(0, x, 1)).toSet)
}
}

Expand Down Expand Up @@ -273,9 +277,9 @@ class GraphSuite extends FunSuite with LocalSparkContext {
sc.parallelize((1 to n).flatMap(x =>
List((0: VertexId, x: VertexId), (0: VertexId, x: VertexId))), 1), "v")
val star2 = doubleStar.groupEdges { (a, b) => a}
assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
assert(star2.vertices.collect.toSet === star.vertices.collect.toSet)
assert(star2.edges.collect().toArray.sorted(Edge.lexicographicOrdering[Int]) ===
star.edges.collect().toArray.sorted(Edge.lexicographicOrdering[Int]))
assert(star2.vertices.collect().toSet === star.vertices.collect().toSet)
}
}

Expand All @@ -300,21 +304,23 @@ class GraphSuite extends FunSuite with LocalSparkContext {
throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
}
Iterator((et.srcId, 1))
}, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
}, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect().toSet
assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet)

// outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x+1) % n: VertexId)), 3)
val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x + 1) % n: VertexId)), 3)
val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache()
val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) =>
newOpt.getOrElse(old)
}
val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
// Map function should only run on edges with source in the active set
if (et.srcId % 2 != 1) {
throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
}
Iterator((et.dstId, 1))
}, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
}, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect().toSet
assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet)

}
Expand All @@ -340,17 +346,18 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val n = 5
val reverseStar = starGraph(sc, n).reverse.cache()
// outerJoinVertices changing type
val reverseStarDegrees =
reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
val reverseStarDegrees = reverseStar.outerJoinVertices(reverseStar.outDegrees) {
(vid, a, bOpt) => bOpt.getOrElse(0)
}
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
(a: Int, b: Int) => a + b).collect.toSet
(a: Int, b: Int) => a + b).collect().toSet
assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
// outerJoinVertices preserving type
val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
val newReverseStar =
reverseStar.outerJoinVertices(messages) { (vid, a, bOpt) => a + bOpt.getOrElse("") }
assert(newReverseStar.vertices.map(_._2).collect.toSet ===
assert(newReverseStar.vertices.map(_._2).collect().toSet ===
(0 to n).map(x => "v%d".format(x)).toSet)
}
}
Expand All @@ -361,7 +368,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val edges = sc.parallelize(List(Edge(1, 2, 0), Edge(2, 1, 0)), 2)
val graph = Graph(verts, edges)
val triplets = graph.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr))
.collect.toSet
.collect().toSet
assert(triplets ===
Set((1: VertexId, 2: VertexId, "a", "b"), (2: VertexId, 1: VertexId, "b", "a")))
}
Expand Down Expand Up @@ -417,7 +424,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val graph = Graph.fromEdgeTuples(edges, 1)
val neighborAttrSums = graph.mapReduceTriplets[Int](
et => Iterator((et.dstId, et.srcAttr)), _ + _)
assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
assert(neighborAttrSums.collect().toSet === Set((0: VertexId, n)))
} finally {
sc.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.SparkContext
*/
trait LocalSparkContext {
/** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */
def withSpark[T](f: SparkContext => T) = {
def withSpark[T](f: SparkContext => T): T = {
val conf = new SparkConf()
GraphXUtils.registerKryoClasses(conf)
val sc = new SparkContext("local", "test", conf)
Expand Down
26 changes: 13 additions & 13 deletions graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.storage.StorageLevel

class VertexRDDSuite extends FunSuite with LocalSparkContext {

def vertices(sc: SparkContext, n: Int) = {
private def vertices(sc: SparkContext, n: Int) = {
VertexRDD(sc.parallelize((0 to n).map(x => (x.toLong, x)), 5))
}

Expand All @@ -52,7 +52,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexA = VertexRDD(sc.parallelize(0 until 75, 2).map(i => (i.toLong, 0))).cache()
val vertexB = VertexRDD(sc.parallelize(25 until 100, 2).map(i => (i.toLong, 1))).cache()
val vertexC = vertexA.minus(vertexB)
assert(vertexC.map(_._1).collect.toSet === (0 until 25).toSet)
assert(vertexC.map(_._1).collect().toSet === (0 until 25).toSet)
}
}

Expand All @@ -62,7 +62,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexB: RDD[(VertexId, Int)] =
sc.parallelize(25 until 100, 2).map(i => (i.toLong, 1)).cache()
val vertexC = vertexA.minus(vertexB)
assert(vertexC.map(_._1).collect.toSet === (0 until 25).toSet)
assert(vertexC.map(_._1).collect().toSet === (0 until 25).toSet)
}
}

Expand All @@ -72,7 +72,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexB = VertexRDD(sc.parallelize(50 until 100, 2).map(i => (i.toLong, 1)))
assert(vertexA.partitions.size != vertexB.partitions.size)
val vertexC = vertexA.minus(vertexB)
assert(vertexC.map(_._1).collect.toSet === (0 until 50).toSet)
assert(vertexC.map(_._1).collect().toSet === (0 until 50).toSet)
}
}

Expand Down Expand Up @@ -106,7 +106,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexB = VertexRDD(sc.parallelize(8 until 16, 2).map(i => (i.toLong, 1)))
assert(vertexA.partitions.size != vertexB.partitions.size)
val vertexC = vertexA.diff(vertexB)
assert(vertexC.map(_._1).collect.toSet === (8 until 16).toSet)
assert(vertexC.map(_._1).collect().toSet === (8 until 16).toSet)
}
}

Expand All @@ -116,11 +116,11 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val verts = vertices(sc, n).cache()
val evens = verts.filter(q => ((q._2 % 2) == 0)).cache()
// leftJoin with another VertexRDD
assert(verts.leftJoin(evens) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet ===
assert(verts.leftJoin(evens) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect().toSet ===
(0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet)
// leftJoin with an RDD
val evensRDD = evens.map(identity)
assert(verts.leftJoin(evensRDD) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet ===
assert(verts.leftJoin(evensRDD) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect().toSet ===
(0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet)
}
}
Expand All @@ -134,7 +134,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexC = vertexA.leftJoin(vertexB) { (vid, old, newOpt) =>
old - newOpt.getOrElse(0)
}
assert(vertexC.filter(v => v._2 != 0).map(_._1).collect.toSet == (1 to 99 by 2).toSet)
assert(vertexC.filter(v => v._2 != 0).map(_._1).collect().toSet == (1 to 99 by 2).toSet)
}
}

Expand All @@ -144,11 +144,11 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val verts = vertices(sc, n).cache()
val evens = verts.filter(q => ((q._2 % 2) == 0)).cache()
// innerJoin with another VertexRDD
assert(verts.innerJoin(evens) { (id, a, b) => a - b }.collect.toSet ===
assert(verts.innerJoin(evens) { (id, a, b) => a - b }.collect().toSet ===
(0 to n by 2).map(x => (x.toLong, 0)).toSet)
// innerJoin with an RDD
val evensRDD = evens.map(identity)
assert(verts.innerJoin(evensRDD) { (id, a, b) => a - b }.collect.toSet ===
assert(verts.innerJoin(evensRDD) { (id, a, b) => a - b }.collect().toSet ===
(0 to n by 2).map(x => (x.toLong, 0)).toSet) }
}

Expand All @@ -161,7 +161,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexC = vertexA.innerJoin(vertexB) { (vid, old, newVal) =>
old - newVal
}
assert(vertexC.filter(v => v._2 == 0).map(_._1).collect.toSet == (0 to 98 by 2).toSet)
assert(vertexC.filter(v => v._2 == 0).map(_._1).collect().toSet == (0 to 98 by 2).toSet)
}
}

Expand All @@ -171,7 +171,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val verts = vertices(sc, n)
val messageTargets = (0 to n) ++ (0 to n by 2)
val messages = sc.parallelize(messageTargets.map(x => (x.toLong, 1)))
assert(verts.aggregateUsingIndex[Int](messages, _ + _).collect.toSet ===
assert(verts.aggregateUsingIndex[Int](messages, _ + _).collect().toSet ===
(0 to n).map(x => (x.toLong, if (x % 2 == 0) 2 else 1)).toSet)
}
}
Expand All @@ -183,7 +183,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
val rdd = VertexRDD(verts, edges, 0, (a: Int, b: Int) => a + b)
// test merge function
assert(rdd.collect.toSet == Set((0L, 0), (1L, 3), (2L, 9)))
assert(rdd.collect().toSet == Set((0L, 0), (1L, 3), (2L, 9)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val gridGraph = GraphGenerators.gridGraph(sc, 10, 10)
val ccGraph = gridGraph.connectedComponents()
val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum()
assert(maxCCid === 0)
}
} // end of Grid connected components
Expand All @@ -42,16 +42,16 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse
val ccGraph = gridGraph.connectedComponents()
val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum()
assert(maxCCid === 0)
}
} // end of Grid connected components


test("Chain Connected Components") {
withSpark { sc =>
val chain1 = (0 until 9).map(x => (x, x+1) )
val chain2 = (10 until 20).map(x => (x, x+1) )
val chain1 = (0 until 9).map(x => (x, x + 1))
val chain2 = (10 until 20).map(x => (x, x + 1))
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0)
val ccGraph = twoChains.connectedComponents()
Expand All @@ -73,12 +73,12 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {

test("Reverse Chain Connected Components") {
withSpark { sc =>
val chain1 = (0 until 9).map(x => (x, x+1) )
val chain2 = (10 until 20).map(x => (x, x+1) )
val chain1 = (0 until 9).map(x => (x, x + 1))
val chain2 = (10 until 20).map(x => (x, x + 1))
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse
val ccGraph = twoChains.connectedComponents()
val vertices = ccGraph.vertices.collect
val vertices = ccGraph.vertices.collect()
for ( (id, cc) <- vertices ) {
if (id < 10) {
assert(cc === 0)
Expand Down Expand Up @@ -120,9 +120,9 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
val ccGraph = graph.connectedComponents()
val vertices = ccGraph.vertices.collect
val vertices = ccGraph.vertices.collect()
for ( (id, cc) <- vertices ) {
assert(cc == 0)
assert(cc === 0)
}
}
} // end of toy connected components
Expand Down
Loading