diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala index f97f329c0e832..ef8429b5f2628 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala @@ -40,6 +40,7 @@ class GraphKryoRegistrator extends KryoRegistrator { kryo.register(classOf[RoutingTableMessage]) kryo.register(classOf[(VertexId, Object)]) kryo.register(classOf[EdgePartition[Object, Object]]) + kryo.register(classOf[VertexRDD[Object]]) kryo.register(classOf[BitSet]) kryo.register(classOf[VertexIdToIndexMap]) kryo.register(classOf[VertexAttributeBlock[Object]]) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala index 389490c139848..32bf8e89eeedb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala @@ -17,6 +17,8 @@ package org.apache.spark.graphx +import scala.reflect.ClassTag + import org.apache.spark.{Logging, SparkContext} import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl} @@ -26,8 +28,9 @@ import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl} object GraphLoader extends Logging { /** - * Loads a graph from an edge list formatted file where each line contains two integers: a source - * id and a target id. Skips lines that begin with `#`. + * Loads a graph from an edge list formatted file where each line contains atleast two integers: a source + * id and a target id. If third argument is porivided that is used as edge attribute; default edge attribute is 1. + * Skips lines that begin with `#`. * * If desired the edges can be automatically oriented in the positive * direction (source Id < target Id) by setting `canonicalOrientation` to @@ -36,10 +39,10 @@ object GraphLoader extends Logging { * @example Loads a file in the following format: * {{{ * # Comment Line - * # Source Id <\t> Target Id - * 1 -5 - * 1 2 - * 2 7 + * # Source Id <\t> Target Id [<\t> Edge attribute] + * 1 -5 100 + * 1 2 200 + * 2 7 * 1 8 * }}} * @@ -49,19 +52,19 @@ object GraphLoader extends Logging { * direction * @param minEdgePartitions the number of partitions for the edge RDD */ - def edgeListFile( + def edgeListFile[ED: ClassTag]( sc: SparkContext, path: String, canonicalOrientation: Boolean = false, minEdgePartitions: Int = 1) - : Graph[Int, Int] = + : Graph[Int, ED] = { val startTime = System.currentTimeMillis // Parse the edge data table directly into edge partitions val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions) val edges = lines.mapPartitionsWithIndex { (pid, iter) => - val builder = new EdgePartitionBuilder[Int, Int] + val builder = new EdgePartitionBuilder[ED, Int] iter.foreach { line => if (!line.isEmpty && line(0) != '#') { val lineArray = line.split("\\s+") @@ -70,10 +73,13 @@ object GraphLoader extends Logging { } val srcId = lineArray(0).toLong val dstId = lineArray(1).toLong + val edgeAttr : ED = + if (lineArray.length >= 3) lineArray(2).asInstanceOf[ED] + else 1.asInstanceOf[ED] if (canonicalOrientation && srcId > dstId) { - builder.add(dstId, srcId, 1) + builder.add(dstId, srcId, edgeAttr) } else { - builder.add(srcId, dstId, 1) + builder.add(srcId, dstId, edgeAttr) } } } @@ -83,7 +89,7 @@ object GraphLoader extends Logging { logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) - GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1) + GraphImpl.fromEdgePartitions[Int, ED](edges, defaultVertexAttr = 1) } // end of edgeListFile } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 8b910fbc5a423..3a737c08ed680 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -57,9 +57,12 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._ */ class VertexRDD[@specialized VD: ClassTag]( val partitionsRDD: RDD[ShippableVertexPartition[VD]]) - extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { + extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) with Serializable { require(partitionsRDD.partitioner.isDefined) + + /** Default constructor is provided to support serialization */ + protected def this() = this(null) /** * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index 069e042ed94a3..7e4a4b9c5be51 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -80,7 +80,7 @@ object Analytics extends Logging { val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")")) - val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + val unpartitionedGraph : Graph[Int, Int] = GraphLoader.edgeListFile[Int](sc, fname, minEdgePartitions = numEPart).cache() val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) @@ -126,7 +126,7 @@ object Analytics extends Logging { println("======================================") val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")")) - val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + val unpartitionedGraph : Graph[Int, Int] = GraphLoader.edgeListFile[Int](sc, fname, minEdgePartitions = numEPart).cache() val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) @@ -148,7 +148,7 @@ object Analytics extends Logging { println("| Triangle Count |") println("======================================") val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")")) - val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, + val graph : Graph[Int, Int]= GraphLoader.edgeListFile[Int](sc, fname, canonicalOrientation = true, minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() val triangles = TriangleCount.run(graph) println("Triangles: " + triangles.vertices.map {