From c9711659cfde374a97e218c4069a22b8923b00dc Mon Sep 17 00:00:00 2001 From: nitin Date: Wed, 28 May 2014 00:46:59 -0700 Subject: [PATCH 1/6] Use optional third argument as edge attribute. --- .../org/apache/spark/graphx/GraphLoader.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala index 389490c13984..f1ee4a608eec 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala @@ -26,8 +26,9 @@ import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl} object GraphLoader extends Logging { /** - * Loads a graph from an edge list formatted file where each line contains two integers: a source - * id and a target id. Skips lines that begin with `#`. + * Loads a graph from an edge list formatted file where each line contains atleast two integers: a source + * id and a target id. If third Integer is porivided that is used as edge attribute; default edge attribute is 1. + * Skips lines that begin with `#`. * * If desired the edges can be automatically oriented in the positive * direction (source Id < target Id) by setting `canonicalOrientation` to @@ -36,10 +37,10 @@ object GraphLoader extends Logging { * @example Loads a file in the following format: * {{{ * # Comment Line - * # Source Id <\t> Target Id - * 1 -5 - * 1 2 - * 2 7 + * # Source Id <\t> Target Id [<\t> Edge attribute] + * 1 -5 100 + * 1 2 200 + * 2 7 * 1 8 * }}} * @@ -70,10 +71,13 @@ object GraphLoader extends Logging { } val srcId = lineArray(0).toLong val dstId = lineArray(1).toLong + val edgeAttr = + if (lineArray.length >= 3) lineArray(2).toInt + else 1 if (canonicalOrientation && srcId > dstId) { - builder.add(dstId, srcId, 1) + builder.add(dstId, srcId, edgeAttr) } else { - builder.add(srcId, dstId, 1) + builder.add(srcId, dstId, edgeAttr) } } } From 86dc34e2bc3307383da398df7ef8418695d92b4e Mon Sep 17 00:00:00 2001 From: nitin Date: Wed, 28 May 2014 12:05:54 -0700 Subject: [PATCH 2/6] Make edge attributed type generic --- .../scala/org/apache/spark/graphx/GraphLoader.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala index f1ee4a608eec..f4d89c34d931 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala @@ -27,7 +27,7 @@ object GraphLoader extends Logging { /** * Loads a graph from an edge list formatted file where each line contains atleast two integers: a source - * id and a target id. If third Integer is porivided that is used as edge attribute; default edge attribute is 1. + * id and a target id. If third argument is porivided that is used as edge attribute; default edge attribute is 1. * Skips lines that begin with `#`. * * If desired the edges can be automatically oriented in the positive @@ -50,7 +50,7 @@ object GraphLoader extends Logging { * direction * @param minEdgePartitions the number of partitions for the edge RDD */ - def edgeListFile( + def edgeListFile[ED: ClassTag]( sc: SparkContext, path: String, canonicalOrientation: Boolean = false, @@ -62,7 +62,7 @@ object GraphLoader extends Logging { // Parse the edge data table directly into edge partitions val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions) val edges = lines.mapPartitionsWithIndex { (pid, iter) => - val builder = new EdgePartitionBuilder[Int, Int] + val builder = new EdgePartitionBuilder[ED, Int] iter.foreach { line => if (!line.isEmpty && line(0) != '#') { val lineArray = line.split("\\s+") @@ -71,8 +71,8 @@ object GraphLoader extends Logging { } val srcId = lineArray(0).toLong val dstId = lineArray(1).toLong - val edgeAttr = - if (lineArray.length >= 3) lineArray(2).toInt + val edgeAttr : ED = + if (lineArray.length >= 3) lineArray(2) else 1 if (canonicalOrientation && srcId > dstId) { builder.add(dstId, srcId, edgeAttr) From f000863961c760304730aaa802b500040869a99b Mon Sep 17 00:00:00 2001 From: nitin Date: Wed, 28 May 2014 12:55:08 -0700 Subject: [PATCH 3/6] Made edge attribute type generic and updated signature in the callers --- .../scala/org/apache/spark/graphx/GraphLoader.scala | 10 ++++++---- .../scala/org/apache/spark/graphx/lib/Analytics.scala | 6 +++--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala index f4d89c34d931..32bf8e89eeed 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala @@ -17,6 +17,8 @@ package org.apache.spark.graphx +import scala.reflect.ClassTag + import org.apache.spark.{Logging, SparkContext} import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl} @@ -55,7 +57,7 @@ object GraphLoader extends Logging { path: String, canonicalOrientation: Boolean = false, minEdgePartitions: Int = 1) - : Graph[Int, Int] = + : Graph[Int, ED] = { val startTime = System.currentTimeMillis @@ -72,8 +74,8 @@ object GraphLoader extends Logging { val srcId = lineArray(0).toLong val dstId = lineArray(1).toLong val edgeAttr : ED = - if (lineArray.length >= 3) lineArray(2) - else 1 + if (lineArray.length >= 3) lineArray(2).asInstanceOf[ED] + else 1.asInstanceOf[ED] if (canonicalOrientation && srcId > dstId) { builder.add(dstId, srcId, edgeAttr) } else { @@ -87,7 +89,7 @@ object GraphLoader extends Logging { logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) - GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1) + GraphImpl.fromEdgePartitions[Int, ED](edges, defaultVertexAttr = 1) } // end of edgeListFile } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index 069e042ed94a..7e4a4b9c5be5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -80,7 +80,7 @@ object Analytics extends Logging { val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")")) - val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + val unpartitionedGraph : Graph[Int, Int] = GraphLoader.edgeListFile[Int](sc, fname, minEdgePartitions = numEPart).cache() val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) @@ -126,7 +126,7 @@ object Analytics extends Logging { println("======================================") val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")")) - val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + val unpartitionedGraph : Graph[Int, Int] = GraphLoader.edgeListFile[Int](sc, fname, minEdgePartitions = numEPart).cache() val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) @@ -148,7 +148,7 @@ object Analytics extends Logging { println("| Triangle Count |") println("======================================") val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")")) - val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, + val graph : Graph[Int, Int]= GraphLoader.edgeListFile[Int](sc, fname, canonicalOrientation = true, minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() val triangles = TriangleCount.run(graph) println("Triangles: " + triangles.vertices.map { From 68d75298b675c628197f0b25273e75f23c5ef59f Mon Sep 17 00:00:00 2001 From: nitin Date: Sun, 1 Jun 2014 16:50:40 -0700 Subject: [PATCH 4/6] make vertex serializable --- graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 8b910fbc5a42..5b2f52b7b3ec 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -57,7 +57,7 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._ */ class VertexRDD[@specialized VD: ClassTag]( val partitionsRDD: RDD[ShippableVertexPartition[VD]]) - extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { + extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) extends Serializable { require(partitionsRDD.partitioner.isDefined) From 8af9e697903c8b4c16f2a57df7f71627674cf1cb Mon Sep 17 00:00:00 2001 From: nitin Date: Sun, 1 Jun 2014 17:14:38 -0700 Subject: [PATCH 5/6] Make vertex Serializable so that I can be used with _SER storage --- .../src/main/scala/org/apache/spark/graphx/VertexRDD.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 5b2f52b7b3ec..3a737c08ed68 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -57,9 +57,12 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._ */ class VertexRDD[@specialized VD: ClassTag]( val partitionsRDD: RDD[ShippableVertexPartition[VD]]) - extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) extends Serializable { + extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) with Serializable { require(partitionsRDD.partitioner.isDefined) + + /** Default constructor is provided to support serialization */ + protected def this() = this(null) /** * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting From 3bb103fc25790573144365eb06c0a0691aca460f Mon Sep 17 00:00:00 2001 From: nitin Date: Sun, 1 Jun 2014 18:01:41 -0700 Subject: [PATCH 6/6] Register VertexRDD --- .../scala/org/apache/spark/graphx/GraphKryoRegistrator.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala index f97f329c0e83..ef8429b5f262 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala @@ -40,6 +40,7 @@ class GraphKryoRegistrator extends KryoRegistrator { kryo.register(classOf[RoutingTableMessage]) kryo.register(classOf[(VertexId, Object)]) kryo.register(classOf[EdgePartition[Object, Object]]) + kryo.register(classOf[VertexRDD[Object]]) kryo.register(classOf[BitSet]) kryo.register(classOf[VertexIdToIndexMap]) kryo.register(classOf[VertexAttributeBlock[Object]])