@@ -44,7 +44,7 @@ object ShippableVertexPartition {
4444 */
4545 def apply [VD : ClassTag ](
4646 iter : Iterator [(VertexId , VD )], routingTable : RoutingTablePartition , defaultVal : VD )
47- : ShippableVertexPartition [VD ] =
47+ : ShippableVertexPartition [VD ] =
4848 apply(iter, routingTable, defaultVal, (a, b) => a)
4949
5050 /**
@@ -54,12 +54,18 @@ object ShippableVertexPartition {
5454 */
5555 def apply [VD : ClassTag ](
5656 iter : Iterator [(VertexId , VD )], routingTable : RoutingTablePartition , defaultVal : VD ,
57- mergeFunc : (VD , VD ) => VD
58- )
59- : ShippableVertexPartition [VD ] = {
60- val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal))
61- val (index, values, mask) = VertexPartitionBase .initFrom(fullIter, mergeFunc)
62- new ShippableVertexPartition (index, values, mask, routingTable)
57+ mergeFunc : (VD , VD ) => VD ): ShippableVertexPartition [VD ] = {
58+ val map = new GraphXPrimitiveKeyOpenHashMap [VertexId , VD ]
59+ // Merge the given vertices using mergeFunc
60+ iter.foreach { pair =>
61+ map.setMerge(pair._1, pair._2, mergeFunc)
62+ }
63+ // Fill in missing vertices mentioned in the routing table
64+ routingTable.iterator.foreach { vid =>
65+ map.changeValue(vid, defaultVal, identity)
66+ }
67+
68+ new ShippableVertexPartition (map.keySet, map._values, map.keySet.getBitSet, routingTable)
6369 }
6470
6571 import scala .language .implicitConversions
0 commit comments