Skip to content

Commit 09e7aa4

Browse files
ankurdavemateiz
authored andcommitted
SPARK-1786: Edge Partition Serialization
This appears to address the issue with edge partition serialization. The solution appears to be just registering the `PrimitiveKeyOpenHashMap`. However I noticed that we appear to have forked that code in GraphX but retained the same name (which is confusing). I also renamed our local copy to `GraphXPrimitiveKeyOpenHashMap`. We should consider dropping that and using the one in Spark if possible. Author: Ankur Dave <[email protected]> Author: Joseph E. Gonzalez <[email protected]> Closes #724 from jegonzal/edge_partition_serialization and squashes the following commits: b0a525a [Ankur Dave] Disable reference tracking to fix serialization test bb7f548 [Ankur Dave] Add failing test for EdgePartition Kryo serialization 67dac22 [Joseph E. Gonzalez] Making EdgePartition serializable. (cherry picked from commit a6b02fb) Signed-off-by: Matei Zaharia <[email protected]>
1 parent f84b798 commit 09e7aa4

File tree

11 files changed

+44
-23
lines changed

11 files changed

+44
-23
lines changed

graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import org.apache.spark.util.BoundedPriorityQueue
2424
import org.apache.spark.util.collection.BitSet
2525

2626
import org.apache.spark.graphx.impl._
27+
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
28+
import org.apache.spark.util.collection.OpenHashSet
29+
2730

2831
/**
2932
* Registers GraphX classes with Kryo for improved performance.
@@ -43,8 +46,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
4346
kryo.register(classOf[PartitionStrategy])
4447
kryo.register(classOf[BoundedPriorityQueue[Object]])
4548
kryo.register(classOf[EdgeDirection])
46-
47-
// This avoids a large number of hash table lookups.
48-
kryo.setReferences(false)
49+
kryo.register(classOf[GraphXPrimitiveKeyOpenHashMap[VertexId, Int]])
50+
kryo.register(classOf[OpenHashSet[Int]])
51+
kryo.register(classOf[OpenHashSet[Long]])
4952
}
5053
}

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
2020
import scala.reflect.{classTag, ClassTag}
2121

2222
import org.apache.spark.graphx._
23-
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
23+
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
2424

2525
/**
2626
* A collection of edges stored in columnar format, along with any vertex attributes referenced. The
@@ -42,12 +42,12 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
4242
private[graphx]
4343
class EdgePartition[
4444
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
45-
@transient val srcIds: Array[VertexId],
46-
@transient val dstIds: Array[VertexId],
47-
@transient val data: Array[ED],
48-
@transient val index: PrimitiveKeyOpenHashMap[VertexId, Int],
49-
@transient val vertices: VertexPartition[VD],
50-
@transient val activeSet: Option[VertexSet] = None
45+
val srcIds: Array[VertexId] = null,
46+
val dstIds: Array[VertexId] = null,
47+
val data: Array[ED] = null,
48+
val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
49+
val vertices: VertexPartition[VD] = null,
50+
val activeSet: Option[VertexSet] = None
5151
) extends Serializable {
5252

5353
/** Return a new `EdgePartition` with the specified edge data. */

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.util.Sorting
2323
import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
2424

2525
import org.apache.spark.graphx._
26-
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
26+
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
2727

2828
private[graphx]
2929
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
@@ -41,7 +41,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
4141
val srcIds = new Array[VertexId](edgeArray.size)
4242
val dstIds = new Array[VertexId](edgeArray.size)
4343
val data = new Array[ED](edgeArray.size)
44-
val index = new PrimitiveKeyOpenHashMap[VertexId, Int]
44+
val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
4545
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
4646
// adding them to the index
4747
if (edgeArray.length > 0) {

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
2020
import scala.reflect.ClassTag
2121

2222
import org.apache.spark.graphx._
23-
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
23+
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
2424

2525
/**
2626
* The Iterator type returned when constructing edge triplets. This could be an anonymous class in

graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.rdd.ShuffledRDD
2525
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
2626

2727
import org.apache.spark.graphx._
28-
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
28+
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
2929

3030
/**
3131
* A message from the edge partition `pid` to the vertex partition containing `vid` specifying that
@@ -69,7 +69,7 @@ object RoutingTablePartition {
6969
: Iterator[RoutingTableMessage] = {
7070
// Determine which positions each vertex id appears in using a map where the low 2 bits
7171
// represent src and dst
72-
val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
72+
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
7373
edgePartition.srcIds.iterator.foreach { srcId =>
7474
map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
7575
}

graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
2222
import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
2323

2424
import org.apache.spark.graphx._
25-
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
25+
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
2626

2727
/** Stores vertex attributes to ship to an edge partition. */
2828
private[graphx]

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
2222
import org.apache.spark.util.collection.BitSet
2323

2424
import org.apache.spark.graphx._
25-
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
25+
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
2626

2727
private[graphx] object VertexPartition {
2828
/** Construct a `VertexPartition` from the given vertices. */

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.reflect.ClassTag
2323
import org.apache.spark.util.collection.BitSet
2424

2525
import org.apache.spark.graphx._
26-
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
26+
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
2727

2828
private[graphx] object VertexPartitionBase {
2929
/**
@@ -32,7 +32,7 @@ private[graphx] object VertexPartitionBase {
3232
*/
3333
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
3434
: (VertexIdToIndexMap, Array[VD], BitSet) = {
35-
val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
35+
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
3636
iter.foreach { pair =>
3737
map(pair._1) = pair._2
3838
}
@@ -45,7 +45,7 @@ private[graphx] object VertexPartitionBase {
4545
*/
4646
def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
4747
: (VertexIdToIndexMap, Array[VD], BitSet) = {
48-
val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
48+
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
4949
iter.foreach { pair =>
5050
map.setMerge(pair._1, pair._2, mergeFunc)
5151
}

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.Logging
2525
import org.apache.spark.util.collection.BitSet
2626

2727
import org.apache.spark.graphx._
28-
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
28+
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
2929

3030
/**
3131
* An class containing additional operations for subclasses of VertexPartitionBase that provide
@@ -224,7 +224,7 @@ private[graphx] abstract class VertexPartitionBaseOps
224224
* Construct a new VertexPartition whose index contains only the vertices in the mask.
225225
*/
226226
def reindex(): Self[VD] = {
227-
val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
227+
val hashMap = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
228228
val arbitraryMerge = (a: VD, b: VD) => a
229229
for ((k, v) <- self.iterator) {
230230
hashMap.setMerge(k, v, arbitraryMerge)

graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala renamed to graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import scala.reflect._
2929
* Under the hood, it uses our OpenHashSet implementation.
3030
*/
3131
private[graphx]
32-
class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
32+
class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
3333
@specialized(Long, Int, Double) V: ClassTag](
3434
val keySet: OpenHashSet[K], var _values: Array[V])
3535
extends Iterable[(K, V)]

0 commit comments

Comments
 (0)