From c55f52fffa79f0ee227367a555172f6cb4ce5cee Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Mon, 31 Mar 2014 12:58:05 +0200 Subject: [PATCH 1/7] Tests that reproduce the problems from SPARK-1188. --- .../graphx/impl/EdgePartitionSuite.scala | 14 +++--- .../impl/EdgeTripletIteratorSuite.scala | 43 +++++++++++++++++++ 2 files changed, 50 insertions(+), 7 deletions(-) create mode 100644 graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala index e135d1d7ad6a3..4c5f0a911d2f9 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala @@ -34,8 +34,8 @@ class EdgePartitionSuite extends FunSuite { builder.add(e.srcId, e.dstId, e.attr) } val edgePartition = builder.toEdgePartition - assert(edgePartition.reverse.iterator.map(_.copy()).toList === reversedEdges) - assert(edgePartition.reverse.reverse.iterator.map(_.copy()).toList === edges) + assert(edgePartition.reverse.iterator.toList === reversedEdges) + assert(edgePartition.reverse.reverse.iterator.toList === edges) } test("map") { @@ -45,7 +45,7 @@ class EdgePartitionSuite extends FunSuite { builder.add(e.srcId, e.dstId, e.attr) } val edgePartition = builder.toEdgePartition - assert(edgePartition.map(e => e.srcId + e.dstId).iterator.map(_.copy()).toList === + assert(edgePartition.map(e => e.srcId + e.dstId).iterator.toList === edges.map(e => e.copy(attr = e.srcId + e.dstId))) } @@ -58,7 +58,7 @@ class EdgePartitionSuite extends FunSuite { builder.add(e.srcId, e.dstId, e.attr) } val edgePartition = builder.toEdgePartition - assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges) + assert(edgePartition.groupEdges(_ + _).iterator.toList === groupedEdges) } test("indexIterator") { @@ -72,8 +72,8 @@ class EdgePartitionSuite extends FunSuite { val edgePartition = builder.toEdgePartition assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges) - assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0) - assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1) + assert(edgePartition.indexIterator(_ == 0).toList === edgesFrom0) + assert(edgePartition.indexIterator(_ == 1).toList === edgesFrom1) } test("innerJoin") { @@ -87,7 +87,7 @@ class EdgePartitionSuite extends FunSuite { val a = makeEdgePartition(aList) val b = makeEdgePartition(bList) - assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList === + assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.toList === List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0))) } } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala new file mode 100644 index 0000000000000..9cbb2d2acdc2d --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.graphx._ + +class EdgeTripletIteratorSuite extends FunSuite { + test("iterator.toList") { + val builder = new EdgePartitionBuilder[Int] + builder.add(1, 2, 0) + builder.add(1, 3, 0) + builder.add(1, 4, 0) + val vidmap = new VertexIdToIndexMap + vidmap.add(1) + vidmap.add(2) + vidmap.add(3) + vidmap.add(4) + val vs = Array.fill(vidmap.capacity)(0) + val iter = new EdgeTripletIterator[Int, Int](vidmap, vs, builder.toEdgePartition) + val result = iter.toList.map(et => (et.srcId, et.dstId)) + assert(result === Seq((1, 2), (1, 3), (1, 4))) + } +} From 0182f2b329b2bb6e6ca8c41245f09db83b71908b Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Mon, 31 Mar 2014 12:58:37 +0200 Subject: [PATCH 2/7] Do not re-use objects in the EdgePartition/EdgeTriplet iterators. This avoids a silent data corruption issue (SPARK-1188) and has no performance impact in my measurements. It also simplifies the code. --- .../spark/graphx/impl/EdgePartition.scala | 38 ++++--------------- .../graphx/impl/EdgeTripletIterator.scala | 7 +--- 2 files changed, 9 insertions(+), 36 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 57fa5eefd5e09..bab353e1a01ec 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -62,18 +62,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * applied to each edge */ def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2] = { - val newData = new Array[ED2](data.size) - val edge = new Edge[ED]() - val size = data.size - var i = 0 - while (i < size) { - edge.srcId = srcIds(i) - edge.dstId = dstIds(i) - edge.attr = data(i) - newData(i) = f(edge) - i += 1 - } - new EdgePartition(srcIds, dstIds, newData, index) + val newData = (0 until data.size).map(i => f(Edge(srcIds(i), dstIds(i), data(i)))) + new EdgePartition(srcIds, dstIds, newData.toArray, index) } /** @@ -84,19 +74,13 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * order of the edges returned by `EdgePartition.iterator` and * should return attributes equal to the number of edges. * - * @param f a function from an edge to a new attribute + * @param iter an iterator for the new attribute values * @tparam ED2 the type of the new attribute - * @return a new edge partition with the result of the function `f` - * applied to each edge + * @return a new edge partition with the attribute values replaced */ def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = { - val newData = new Array[ED2](data.size) - var i = 0 - while (iter.hasNext) { - newData(i) = iter.next() - i += 1 - } - assert(newData.size == i) + val newData = iter.toArray + assert(newData.size == data.size) new EdgePartition(srcIds, dstIds, newData, index) } @@ -191,15 +175,12 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * @return an iterator over edges in the partition */ def iterator = new Iterator[Edge[ED]] { - private[this] val edge = new Edge[ED] private[this] var pos = 0 override def hasNext: Boolean = pos < EdgePartition.this.size override def next(): Edge[ED] = { - edge.srcId = srcIds(pos) - edge.dstId = dstIds(pos) - edge.attr = data(pos) + val edge = Edge(srcIds(pos), dstIds(pos), data(pos)) pos += 1 edge } @@ -218,7 +199,6 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * cluster must start at position `index`. */ private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] { - private[this] val edge = new Edge[ED] private[this] var pos = index override def hasNext: Boolean = { @@ -227,9 +207,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) override def next(): Edge[ED] = { assert(srcIds(pos) == srcId) - edge.srcId = srcIds(pos) - edge.dstId = dstIds(pos) - edge.attr = data(pos) + val edge = Edge(srcIds(pos), dstIds(pos), data(pos)) pos += 1 edge } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala index 886c250d7cffd..220a89d73d711 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala @@ -37,20 +37,15 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag]( // Current position in the array. private var pos = 0 - // A triplet object that this iterator.next() call returns. We reuse this object to avoid - // allocating too many temporary Java objects. - private val triplet = new EdgeTriplet[VD, ED] - private val vmap = new PrimitiveKeyOpenHashMap[VertexId, VD](vidToIndex, vertexArray) override def hasNext: Boolean = pos < edgePartition.size override def next() = { + val triplet = new EdgeTriplet[VD, ED] triplet.srcId = edgePartition.srcIds(pos) - // assert(vmap.containsKey(e.src.id)) triplet.srcAttr = vmap(triplet.srcId) triplet.dstId = edgePartition.dstIds(pos) - // assert(vmap.containsKey(e.dst.id)) triplet.dstAttr = vmap(triplet.dstId) triplet.attr = edgePartition.data(pos) pos += 1 From 2da5e87bd324eada988e0477a3deb9deda914bd9 Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Tue, 1 Apr 2014 12:33:59 +0200 Subject: [PATCH 3/7] Restore object re-use in EdgePartition. --- .../spark/graphx/impl/EdgePartition.scala | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index bab353e1a01ec..654902e4cc9ef 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -62,8 +62,18 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * applied to each edge */ def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2] = { - val newData = (0 until data.size).map(i => f(Edge(srcIds(i), dstIds(i), data(i)))) - new EdgePartition(srcIds, dstIds, newData.toArray, index) + val newData = new Array[ED2](data.size) + val edge = new Edge[ED]() + val size = data.size + var i = 0 + while (i < size) { + edge.srcId = srcIds(i) + edge.dstId = dstIds(i) + edge.attr = data(i) + newData(i) = f(edge) + i += 1 + } + new EdgePartition(srcIds, dstIds, newData, index) } /** @@ -175,12 +185,15 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * @return an iterator over edges in the partition */ def iterator = new Iterator[Edge[ED]] { + private[this] val edge = new Edge[ED] private[this] var pos = 0 override def hasNext: Boolean = pos < EdgePartition.this.size override def next(): Edge[ED] = { - val edge = Edge(srcIds(pos), dstIds(pos), data(pos)) + edge.srcId = srcIds(pos) + edge.dstId = dstIds(pos) + edge.attr = data(pos) pos += 1 edge } @@ -199,6 +212,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * cluster must start at position `index`. */ private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] { + private[this] val edge = new Edge[ED] private[this] var pos = index override def hasNext: Boolean = { @@ -207,7 +221,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) override def next(): Edge[ED] = { assert(srcIds(pos) == srcId) - val edge = Edge(srcIds(pos), dstIds(pos), data(pos)) + edge.srcId = srcIds(pos) + edge.dstId = dstIds(pos) + edge.attr = data(pos) pos += 1 edge } From 4ec77f854cea08e70fe1a5a8148a69dcbd5c5bcf Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Tue, 1 Apr 2014 13:02:32 +0200 Subject: [PATCH 4/7] Add comments about object re-use to the affected functions. --- .../org/apache/spark/graphx/impl/EdgePartition.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 654902e4cc9ef..7a8ca2c618cfe 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -56,6 +56,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * Construct a new edge partition by applying the function f to all * edges in this partition. * + * Be careful not to keep references to the objects passed to `f`. + * To improve GC performance the same object is re-used for each call. + * * @param f a function from an edge to a new attribute * @tparam ED2 the type of the new attribute * @return a new edge partition with the result of the function `f` @@ -182,6 +185,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) /** * Get an iterator over the edges in this partition. * + * Be careful not to keep references to the objects from this iterator. + * To improve GC performance the same object is re-used in `next()`. + * * @return an iterator over edges in the partition */ def iterator = new Iterator[Edge[ED]] { @@ -210,6 +216,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) /** * Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The * cluster must start at position `index`. + * + * Be careful not to keep references to the objects from this iterator. To improve GC performance + * the same object is re-used in `next()`. */ private def clusterIterator(srcId: VertexId, index: Int) = new Iterator[Edge[ED]] { private[this] val edge = new Edge[ED] From 49556977bab69f001b51ec1b5ca59400a0911149 Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Tue, 1 Apr 2014 13:05:04 +0200 Subject: [PATCH 5/7] Create a copy of the Edge objects in EdgeRDD.compute(). This avoids exposing the object re-use, while still enables the more efficient behavior for internal code. --- graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index f2296a865e1b3..6d04bf790e3a5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -45,7 +45,8 @@ class EdgeRDD[@specialized ED: ClassTag]( partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { - firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator + val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context) + p.next._2.iterator.map(_.copy()) } override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() From 4117a64a88409c91002b139be626ee191e36043e Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Tue, 1 Apr 2014 14:19:35 +0200 Subject: [PATCH 6/7] Revert EdgePartitionSuite. --- .../spark/graphx/impl/EdgePartitionSuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala index 4c5f0a911d2f9..e135d1d7ad6a3 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala @@ -34,8 +34,8 @@ class EdgePartitionSuite extends FunSuite { builder.add(e.srcId, e.dstId, e.attr) } val edgePartition = builder.toEdgePartition - assert(edgePartition.reverse.iterator.toList === reversedEdges) - assert(edgePartition.reverse.reverse.iterator.toList === edges) + assert(edgePartition.reverse.iterator.map(_.copy()).toList === reversedEdges) + assert(edgePartition.reverse.reverse.iterator.map(_.copy()).toList === edges) } test("map") { @@ -45,7 +45,7 @@ class EdgePartitionSuite extends FunSuite { builder.add(e.srcId, e.dstId, e.attr) } val edgePartition = builder.toEdgePartition - assert(edgePartition.map(e => e.srcId + e.dstId).iterator.toList === + assert(edgePartition.map(e => e.srcId + e.dstId).iterator.map(_.copy()).toList === edges.map(e => e.copy(attr = e.srcId + e.dstId))) } @@ -58,7 +58,7 @@ class EdgePartitionSuite extends FunSuite { builder.add(e.srcId, e.dstId, e.attr) } val edgePartition = builder.toEdgePartition - assert(edgePartition.groupEdges(_ + _).iterator.toList === groupedEdges) + assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges) } test("indexIterator") { @@ -72,8 +72,8 @@ class EdgePartitionSuite extends FunSuite { val edgePartition = builder.toEdgePartition assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges) - assert(edgePartition.indexIterator(_ == 0).toList === edgesFrom0) - assert(edgePartition.indexIterator(_ == 1).toList === edgesFrom1) + assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0) + assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1) } test("innerJoin") { @@ -87,7 +87,7 @@ class EdgePartitionSuite extends FunSuite { val a = makeEdgePartition(aList) val b = makeEdgePartition(bList) - assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.toList === + assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList === List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0))) } } From 574302bc38d57ab35884cee2f1ace7afa88cd173 Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Wed, 2 Apr 2014 12:05:29 +0200 Subject: [PATCH 7/7] Restore "manual" copying in EdgePartition.map(Iterator). Add comment to discourage novices like myself from trying to simplify the code. --- .../org/apache/spark/graphx/impl/EdgePartition.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 7a8ca2c618cfe..2e05f5d4e4969 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -92,8 +92,14 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * @return a new edge partition with the attribute values replaced */ def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = { - val newData = iter.toArray - assert(newData.size == data.size) + // Faster than iter.toArray, because the expected size is known. + val newData = new Array[ED2](data.size) + var i = 0 + while (iter.hasNext) { + newData(i) = iter.next() + i += 1 + } + assert(newData.size == i) new EdgePartition(srcIds, dstIds, newData, index) }