@@ -38,12 +38,12 @@ class GraphSuite extends FunSuite with LocalSparkContext {
3838 val doubleRing = ring ++ ring
3939 val graph = Graph .fromEdgeTuples(sc.parallelize(doubleRing), 1 )
4040 assert(graph.edges.count() === doubleRing.size)
41- assert(graph.edges.collect.forall(e => e.attr == 1 ))
41+ assert(graph.edges.collect() .forall(e => e.attr == 1 ))
4242
4343 // uniqueEdges option should uniquify edges and store duplicate count in edge attributes
4444 val uniqueGraph = Graph .fromEdgeTuples(sc.parallelize(doubleRing), 1 , Some (RandomVertexCut ))
4545 assert(uniqueGraph.edges.count() === ring.size)
46- assert(uniqueGraph.edges.collect.forall(e => e.attr == 2 ))
46+ assert(uniqueGraph.edges.collect() .forall(e => e.attr == 2 ))
4747 }
4848 }
4949
@@ -64,7 +64,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
6464 assert( graph.edges.count() === rawEdges.size )
6565 // Vertices not explicitly provided but referenced by edges should be created automatically
6666 assert( graph.vertices.count() === 100 )
67- graph.triplets.collect.map { et =>
67+ graph.triplets.collect() .map { et =>
6868 assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && ! et.srcAttr))
6969 assert((et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && ! et.dstAttr))
7070 }
@@ -75,15 +75,17 @@ class GraphSuite extends FunSuite with LocalSparkContext {
7575 withSpark { sc =>
7676 val n = 5
7777 val star = starGraph(sc, n)
78- assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
79- (1 to n).map(x => (0 : VertexId , x : VertexId , " v" , " v" )).toSet)
78+ assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect() .toSet
79+ === (1 to n).map(x => (0 : VertexId , x : VertexId , " v" , " v" )).toSet)
8080 }
8181 }
8282
8383 test(" partitionBy" ) {
8484 withSpark { sc =>
85- def mkGraph (edges : List [(Long , Long )]) = Graph .fromEdgeTuples(sc.parallelize(edges, 2 ), 0 )
86- def nonemptyParts (graph : Graph [Int , Int ]) = {
85+ def mkGraph (edges : List [(Long , Long )]): Graph [Int , Int ] = {
86+ Graph .fromEdgeTuples(sc.parallelize(edges, 2 ), 0 )
87+ }
88+ def nonemptyParts (graph : Graph [Int , Int ]): RDD [List [Edge [Int ]]] = {
8789 graph.edges.partitionsRDD.mapPartitions { iter =>
8890 Iterator (iter.next()._2.iterator.toList)
8991 }.filter(_.nonEmpty)
@@ -102,7 +104,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
102104 assert(nonemptyParts(mkGraph(sameSrcEdges).partitionBy(EdgePartition1D )).count === 1 )
103105 // partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into
104106 // the same partition
105- assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut )).count === 1 )
107+ assert(
108+ nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut )).count === 1 )
106109 // partitionBy(EdgePartition2D) puts identical edges in the same partition
107110 assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D )).count === 1 )
108111
@@ -140,10 +143,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
140143 val g = Graph (
141144 sc.parallelize(List ((0L , " a" ), (1L , " b" ), (2L , " c" ))),
142145 sc.parallelize(List (Edge (0L , 1L , 1 ), Edge (0L , 2L , 1 )), 2 ))
143- assert(g.triplets.collect.map(_.toTuple).toSet ===
146+ assert(g.triplets.collect() .map(_.toTuple).toSet ===
144147 Set (((0L , " a" ), (1L , " b" ), 1 ), ((0L , " a" ), (2L , " c" ), 1 )))
145148 val gPart = g.partitionBy(EdgePartition2D )
146- assert(gPart.triplets.collect.map(_.toTuple).toSet ===
149+ assert(gPart.triplets.collect() .map(_.toTuple).toSet ===
147150 Set (((0L , " a" ), (1L , " b" ), 1 ), ((0L , " a" ), (2L , " c" ), 1 )))
148151 }
149152 }
@@ -154,10 +157,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
154157 val star = starGraph(sc, n)
155158 // mapVertices preserving type
156159 val mappedVAttrs = star.mapVertices((vid, attr) => attr + " 2" )
157- assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x : VertexId , " v2" )).toSet)
160+ assert(mappedVAttrs.vertices.collect() .toSet === (0 to n).map(x => (x : VertexId , " v2" )).toSet)
158161 // mapVertices changing type
159162 val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length)
160- assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x : VertexId , 1 )).toSet)
163+ assert(mappedVAttrs2.vertices.collect() .toSet === (0 to n).map(x => (x : VertexId , 1 )).toSet)
161164 }
162165 }
163166
@@ -177,12 +180,12 @@ class GraphSuite extends FunSuite with LocalSparkContext {
177180 // Trigger initial vertex replication
178181 graph0.triplets.foreach(x => {})
179182 // Change type of replicated vertices, but preserve erased type
180- val graph1 = graph0.mapVertices {
181- case (vid, integerOpt) => integerOpt .map((x : java.lang.Integer ) => ( x.toDouble) : java.lang.Double )
183+ val graph1 = graph0.mapVertices { case (vid, integerOpt) =>
184+ integerOpt.map((x : java.lang.Integer ) => x.toDouble: java.lang.Double )
182185 }
183186 // Access replicated vertices, exposing the erased type
184187 val graph2 = graph1.mapTriplets(t => t.srcAttr.get)
185- assert(graph2.edges.map(_.attr).collect.toSet === Set [java.lang.Double ](1.0 , 2.0 , 3.0 ))
188+ assert(graph2.edges.map(_.attr).collect() .toSet === Set [java.lang.Double ](1.0 , 2.0 , 3.0 ))
186189 }
187190 }
188191
@@ -202,7 +205,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
202205 withSpark { sc =>
203206 val n = 5
204207 val star = starGraph(sc, n)
205- assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect.toSet ===
208+ assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect() .toSet ===
206209 (1L to n).map(x => Edge (0 , x, " vv" )).toSet)
207210 }
208211 }
@@ -211,7 +214,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
211214 withSpark { sc =>
212215 val n = 5
213216 val star = starGraph(sc, n)
214- assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x : VertexId , 1 )).toSet)
217+ assert(star.reverse.outDegrees.collect() .toSet === (1 to n).map(x => (x : VertexId , 1 )).toSet)
215218 }
216219 }
217220
@@ -221,7 +224,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
221224 val edges : RDD [Edge [Int ]] = sc.parallelize(Array (Edge (1L , 2L , 0 )))
222225 val graph = Graph (vertices, edges).reverse
223226 val result = graph.mapReduceTriplets[Int ](et => Iterator ((et.dstId, et.srcAttr)), _ + _)
224- assert(result.collect.toSet === Set ((1L , 2 )))
227+ assert(result.collect() .toSet === Set ((1L , 2 )))
225228 }
226229 }
227230
@@ -237,7 +240,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
237240 assert(subgraph.vertices.collect().toSet === (0 to n by 2 ).map(x => (x, " v" )).toSet)
238241
239242 // And 4 edges.
240- assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2 ).map(x => Edge (0 , x, 1 )).toSet)
243+ assert(subgraph.edges.map(_.copy()).collect().toSet ===
244+ (2 to n by 2 ).map(x => Edge (0 , x, 1 )).toSet)
241245 }
242246 }
243247
@@ -273,9 +277,9 @@ class GraphSuite extends FunSuite with LocalSparkContext {
273277 sc.parallelize((1 to n).flatMap(x =>
274278 List ((0 : VertexId , x : VertexId ), (0 : VertexId , x : VertexId ))), 1 ), " v" )
275279 val star2 = doubleStar.groupEdges { (a, b) => a}
276- assert(star2.edges.collect.toArray.sorted(Edge .lexicographicOrdering[Int ]) ===
277- star.edges.collect.toArray.sorted(Edge .lexicographicOrdering[Int ]))
278- assert(star2.vertices.collect.toSet === star.vertices.collect.toSet)
280+ assert(star2.edges.collect() .toArray.sorted(Edge .lexicographicOrdering[Int ]) ===
281+ star.edges.collect() .toArray.sorted(Edge .lexicographicOrdering[Int ]))
282+ assert(star2.vertices.collect() .toSet === star.vertices.collect() .toSet)
279283 }
280284 }
281285
@@ -300,21 +304,23 @@ class GraphSuite extends FunSuite with LocalSparkContext {
300304 throw new Exception (" map ran on edge with dst vid %d, which is odd" .format(et.dstId))
301305 }
302306 Iterator ((et.srcId, 1 ))
303- }, (a : Int , b : Int ) => a + b, Some ((active, EdgeDirection .In ))).collect.toSet
307+ }, (a : Int , b : Int ) => a + b, Some ((active, EdgeDirection .In ))).collect() .toSet
304308 assert(numEvenNeighbors === (1 to n).map(x => (x : VertexId , n / 2 )).toSet)
305309
306310 // outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
307- val ringEdges = sc.parallelize((0 until n).map(x => (x : VertexId , (x+ 1 ) % n : VertexId )), 3 )
311+ val ringEdges = sc.parallelize((0 until n).map(x => (x : VertexId , (x + 1 ) % n : VertexId )), 3 )
308312 val ring = Graph .fromEdgeTuples(ringEdges, 0 ) .mapVertices((vid, attr) => vid).cache()
309313 val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(- _).cache()
310- val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
314+ val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) =>
315+ newOpt.getOrElse(old)
316+ }
311317 val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
312318 // Map function should only run on edges with source in the active set
313319 if (et.srcId % 2 != 1 ) {
314320 throw new Exception (" map ran on edge with src vid %d, which is even" .format(et.dstId))
315321 }
316322 Iterator ((et.dstId, 1 ))
317- }, (a : Int , b : Int ) => a + b, Some (changed, EdgeDirection .Out )).collect.toSet
323+ }, (a : Int , b : Int ) => a + b, Some (changed, EdgeDirection .Out )).collect() .toSet
318324 assert(numOddNeighbors === (2 to n by 2 ).map(x => (x : VertexId , 1 )).toSet)
319325
320326 }
@@ -340,17 +346,18 @@ class GraphSuite extends FunSuite with LocalSparkContext {
340346 val n = 5
341347 val reverseStar = starGraph(sc, n).reverse.cache()
342348 // outerJoinVertices changing type
343- val reverseStarDegrees =
344- reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0 ) }
349+ val reverseStarDegrees = reverseStar.outerJoinVertices(reverseStar.outDegrees) {
350+ (vid, a, bOpt) => bOpt.getOrElse(0 )
351+ }
345352 val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
346353 et => Iterator ((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
347- (a : Int , b : Int ) => a + b).collect.toSet
354+ (a : Int , b : Int ) => a + b).collect() .toSet
348355 assert(neighborDegreeSums === Set ((0 : VertexId , n)) ++ (1 to n).map(x => (x : VertexId , 0 )))
349356 // outerJoinVertices preserving type
350357 val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
351358 val newReverseStar =
352359 reverseStar.outerJoinVertices(messages) { (vid, a, bOpt) => a + bOpt.getOrElse(" " ) }
353- assert(newReverseStar.vertices.map(_._2).collect.toSet ===
360+ assert(newReverseStar.vertices.map(_._2).collect() .toSet ===
354361 (0 to n).map(x => " v%d" .format(x)).toSet)
355362 }
356363 }
@@ -361,7 +368,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
361368 val edges = sc.parallelize(List (Edge (1 , 2 , 0 ), Edge (2 , 1 , 0 )), 2 )
362369 val graph = Graph (verts, edges)
363370 val triplets = graph.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr))
364- .collect.toSet
371+ .collect() .toSet
365372 assert(triplets ===
366373 Set ((1 : VertexId , 2 : VertexId , " a" , " b" ), (2 : VertexId , 1 : VertexId , " b" , " a" )))
367374 }
@@ -417,7 +424,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
417424 val graph = Graph .fromEdgeTuples(edges, 1 )
418425 val neighborAttrSums = graph.mapReduceTriplets[Int ](
419426 et => Iterator ((et.dstId, et.srcAttr)), _ + _)
420- assert(neighborAttrSums.collect.toSet === Set ((0 : VertexId , n)))
427+ assert(neighborAttrSums.collect() .toSet === Set ((0 : VertexId , n)))
421428 } finally {
422429 sc.stop()
423430 }
0 commit comments