Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[RoutingTableMessage])
kryo.register(classOf[(VertexId, Object)])
kryo.register(classOf[EdgePartition[Object, Object]])
kryo.register(classOf[VertexRDD[Object]])
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
kryo.register(classOf[VertexAttributeBlock[Object]])
Expand Down
30 changes: 18 additions & 12 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.graphx

import scala.reflect.ClassTag

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}

Expand All @@ -26,8 +28,9 @@ import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
object GraphLoader extends Logging {

/**
* Loads a graph from an edge list formatted file where each line contains two integers: a source
* id and a target id. Skips lines that begin with `#`.
* Loads a graph from an edge list formatted file where each line contains atleast two integers: a source
* id and a target id. If third argument is porivided that is used as edge attribute; default edge attribute is 1.
* Skips lines that begin with `#`.
*
* If desired the edges can be automatically oriented in the positive
* direction (source Id < target Id) by setting `canonicalOrientation` to
Expand All @@ -36,10 +39,10 @@ object GraphLoader extends Logging {
* @example Loads a file in the following format:
* {{{
* # Comment Line
* # Source Id <\t> Target Id
* 1 -5
* 1 2
* 2 7
* # Source Id <\t> Target Id [<\t> Edge attribute]
* 1 -5 100
* 1 2 200
* 2 7
* 1 8
* }}}
*
Expand All @@ -49,19 +52,19 @@ object GraphLoader extends Logging {
* direction
* @param minEdgePartitions the number of partitions for the edge RDD
*/
def edgeListFile(
def edgeListFile[ED: ClassTag](
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1)
: Graph[Int, Int] =
: Graph[Int, ED] =
{
val startTime = System.currentTimeMillis

// Parse the edge data table directly into edge partitions
val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int, Int]
val builder = new EdgePartitionBuilder[ED, Int]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
Expand All @@ -70,10 +73,13 @@ object GraphLoader extends Logging {
}
val srcId = lineArray(0).toLong
val dstId = lineArray(1).toLong
val edgeAttr : ED =
if (lineArray.length >= 3) lineArray(2).asInstanceOf[ED]
else 1.asInstanceOf[ED]
if (canonicalOrientation && srcId > dstId) {
builder.add(dstId, srcId, 1)
builder.add(dstId, srcId, edgeAttr)
} else {
builder.add(srcId, dstId, 1)
builder.add(srcId, dstId, edgeAttr)
}
}
}
Expand All @@ -83,7 +89,7 @@ object GraphLoader extends Logging {

logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))

GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
GraphImpl.fromEdgePartitions[Int, ED](edges, defaultVertexAttr = 1)
} // end of edgeListFile

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._
*/
class VertexRDD[@specialized VD: ClassTag](
val partitionsRDD: RDD[ShippableVertexPartition[VD]])
extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) with Serializable {

require(partitionsRDD.partitioner.isDefined)

/** Default constructor is provided to support serialization */
protected def this() = this(null)

/**
* Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object Analytics extends Logging {

val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))

val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
val unpartitionedGraph : Graph[Int, Int] = GraphLoader.edgeListFile[Int](sc, fname,
minEdgePartitions = numEPart).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))

Expand Down Expand Up @@ -126,7 +126,7 @@ object Analytics extends Logging {
println("======================================")

val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
val unpartitionedGraph : Graph[Int, Int] = GraphLoader.edgeListFile[Int](sc, fname,
minEdgePartitions = numEPart).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))

Expand All @@ -148,7 +148,7 @@ object Analytics extends Logging {
println("| Triangle Count |")
println("======================================")
val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")"))
val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
val graph : Graph[Int, Int]= GraphLoader.edgeListFile[Int](sc, fname, canonicalOrientation = true,
minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
val triangles = TriangleCount.run(graph)
println("Triangles: " + triangles.vertices.map {
Expand Down