1818package org .apache .spark .graphx .impl
1919
2020import scala .language .higherKinds
21+ import scala .language .implicitConversions
2122import scala .reflect .ClassTag
2223
2324import org .apache .spark .Logging
@@ -31,14 +32,14 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
3132 * implicit evidence of membership in the `VertexPartitionBaseOpsConstructor` typeclass (for
3233 * example, [[VertexPartition.VertexPartitionOpsConstructor ]]).
3334 */
34- private [graphx] abstract class VertexPartitionBaseOps [ VD : ClassTag , T [ X ] <: VertexPartitionBase [ X ]]
35- ( self : T [ VD ])
36- (implicit ev : VertexPartitionBaseOpsConstructor [ T ])
37- extends Logging {
35+ private [graphx] abstract class VertexPartitionBaseOps
36+ [ VD : ClassTag , Self [ X ] <: VertexPartitionBase [ X ] : VertexPartitionBaseOpsConstructor ]
37+ (self : Self [ VD ])
38+ extends Logging {
3839
39- def withIndex (index : VertexIdToIndexMap ): T [VD ]
40- def withValues [VD2 : ClassTag ](values : Array [VD2 ]): T [VD2 ]
41- def withMask (mask : BitSet ): T [VD ]
40+ def withIndex (index : VertexIdToIndexMap ): Self [VD ]
41+ def withValues [VD2 : ClassTag ](values : Array [VD2 ]): Self [VD2 ]
42+ def withMask (mask : BitSet ): Self [VD ]
4243
4344 /**
4445 * Pass each vertex attribute along with the vertex id through a map
@@ -53,7 +54,7 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
5354 * each of the entries in the original VertexRDD. The resulting
5455 * VertexPartition retains the same index.
5556 */
56- def map [VD2 : ClassTag ](f : (VertexId , VD ) => VD2 ): T [VD2 ] = {
57+ def map [VD2 : ClassTag ](f : (VertexId , VD ) => VD2 ): Self [VD2 ] = {
5758 // Construct a view of the map transformation
5859 val newValues = new Array [VD2 ](self.capacity)
5960 var i = self.mask.nextSetBit(0 )
@@ -73,7 +74,7 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
7374 * RDD can be easily joined with the original vertex-set. Furthermore, the filter only
7475 * modifies the bitmap index and so no new values are allocated.
7576 */
76- def filter (pred : (VertexId , VD ) => Boolean ): T [VD ] = {
77+ def filter (pred : (VertexId , VD ) => Boolean ): Self [VD ] = {
7778 // Allocate the array to store the results into
7879 val newMask = new BitSet (self.capacity)
7980 // Iterate over the active bits in the old mask and evaluate the predicate
@@ -91,7 +92,7 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
9192 * Hides vertices that are the same between this and other. For vertices that are different, keeps
9293 * the values from `other`. The indices of `this` and `other` must be the same.
9394 */
94- def diff (other : T [VD ]): T [VD ] = {
95+ def diff (other : Self [VD ]): Self [VD ] = {
9596 if (self.index != other.index) {
9697 logWarning(" Diffing two VertexPartitions with different indexes is slow." )
9798 diff(createUsingIndex(other.iterator))
@@ -104,14 +105,14 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
104105 }
105106 i = newMask.nextSetBit(i + 1 )
106107 }
107- ev. toOps(this .withValues(other.values)).withMask(newMask)
108+ toOps(this .withValues(other.values)).withMask(newMask)
108109 }
109110 }
110111
111112 /** Left outer join another VertexPartition. */
112113 def leftJoin [VD2 : ClassTag , VD3 : ClassTag ]
113- (other : T [VD2 ])
114- (f : (VertexId , VD , Option [VD2 ]) => VD3 ): T [VD3 ] = {
114+ (other : Self [VD2 ])
115+ (f : (VertexId , VD , Option [VD2 ]) => VD3 ): Self [VD3 ] = {
115116 if (self.index != other.index) {
116117 logWarning(" Joining two VertexPartitions with different indexes is slow." )
117118 leftJoin(createUsingIndex(other.iterator))(f)
@@ -131,14 +132,14 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
131132 /** Left outer join another iterator of messages. */
132133 def leftJoin [VD2 : ClassTag , VD3 : ClassTag ]
133134 (other : Iterator [(VertexId , VD2 )])
134- (f : (VertexId , VD , Option [VD2 ]) => VD3 ): T [VD3 ] = {
135+ (f : (VertexId , VD , Option [VD2 ]) => VD3 ): Self [VD3 ] = {
135136 leftJoin(createUsingIndex(other))(f)
136137 }
137138
138139 /** Inner join another VertexPartition. */
139140 def innerJoin [U : ClassTag , VD2 : ClassTag ]
140- (other : T [U ])
141- (f : (VertexId , VD , U ) => VD2 ): T [VD2 ] = {
141+ (other : Self [U ])
142+ (f : (VertexId , VD , U ) => VD2 ): Self [VD2 ] = {
142143 if (self.index != other.index) {
143144 logWarning(" Joining two VertexPartitions with different indexes is slow." )
144145 innerJoin(createUsingIndex(other.iterator))(f)
@@ -150,7 +151,7 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
150151 newValues(i) = f(self.index.getValue(i), self.values(i), other.values(i))
151152 i = newMask.nextSetBit(i + 1 )
152153 }
153- ev.toOps( this .withValues(newValues) ).withMask(newMask)
154+ this .withValues(newValues).withMask(newMask)
154155 }
155156 }
156157
@@ -159,15 +160,15 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
159160 */
160161 def innerJoin [U : ClassTag , VD2 : ClassTag ]
161162 (iter : Iterator [Product2 [VertexId , U ]])
162- (f : (VertexId , VD , U ) => VD2 ): T [VD2 ] = {
163+ (f : (VertexId , VD , U ) => VD2 ): Self [VD2 ] = {
163164 innerJoin(createUsingIndex(iter))(f)
164165 }
165166
166167 /**
167168 * Similar effect as aggregateUsingIndex((a, b) => a)
168169 */
169170 def createUsingIndex [VD2 : ClassTag ](iter : Iterator [Product2 [VertexId , VD2 ]])
170- : T [VD2 ] = {
171+ : Self [VD2 ] = {
171172 val newMask = new BitSet (self.capacity)
172173 val newValues = new Array [VD2 ](self.capacity)
173174 iter.foreach { case (vid, vdata) =>
@@ -177,14 +178,14 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
177178 newValues(pos) = vdata
178179 }
179180 }
180- ev.toOps( this .withValues(newValues) ).withMask(newMask)
181+ this .withValues(newValues).withMask(newMask)
181182 }
182183
183184 /**
184185 * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
185186 * the partition, hidden by the bitmask.
186187 */
187- def innerJoinKeepLeft (iter : Iterator [Product2 [VertexId , VD ]]): T [VD ] = {
188+ def innerJoinKeepLeft (iter : Iterator [Product2 [VertexId , VD ]]): Self [VD ] = {
188189 val newMask = new BitSet (self.capacity)
189190 val newValues = new Array [VD ](self.capacity)
190191 System .arraycopy(self.values, 0 , newValues, 0 , newValues.length)
@@ -195,12 +196,12 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
195196 newValues(pos) = vdata
196197 }
197198 }
198- ev.toOps( this .withValues(newValues) ).withMask(newMask)
199+ this .withValues(newValues).withMask(newMask)
199200 }
200201
201202 def aggregateUsingIndex [VD2 : ClassTag ](
202203 iter : Iterator [Product2 [VertexId , VD2 ]],
203- reduceFunc : (VD2 , VD2 ) => VD2 ): T [VD2 ] = {
204+ reduceFunc : (VD2 , VD2 ) => VD2 ): Self [VD2 ] = {
204205 val newMask = new BitSet (self.capacity)
205206 val newValues = new Array [VD2 ](self.capacity)
206207 iter.foreach { product =>
@@ -216,22 +217,29 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
216217 }
217218 }
218219 }
219- ev.toOps( this .withValues(newValues) ).withMask(newMask)
220+ this .withValues(newValues).withMask(newMask)
220221 }
221222
222223 /**
223224 * Construct a new VertexPartition whose index contains only the vertices in the mask.
224225 */
225- def reindex (): T [VD ] = {
226+ def reindex (): Self [VD ] = {
226227 val hashMap = new PrimitiveKeyOpenHashMap [VertexId , VD ]
227228 val arbitraryMerge = (a : VD , b : VD ) => a
228229 for ((k, v) <- self.iterator) {
229230 hashMap.setMerge(k, v, arbitraryMerge)
230231 }
231- ev.toOps(
232- ev.toOps(
233- this .withIndex(hashMap.keySet))
234- .withValues(hashMap._values))
235- .withMask(hashMap.keySet.getBitSet)
232+ this .withIndex(hashMap.keySet).withValues(hashMap._values).withMask(hashMap.keySet.getBitSet)
233+ }
234+
235+ /**
236+ * Converts a vertex partition (in particular, one of type `Self`) into a
237+ * `VertexPartitionBaseOps`. Within this class, this allows chaining the methods defined above,
238+ * because these methods return a `Self` and this implicit conversion re-wraps that in a
239+ * `VertexPartitionBaseOps`. This relies on the context bound on `Self`.
240+ */
241+ private implicit def toOps [VD2 : ClassTag ](
242+ partition : Self [VD2 ]): VertexPartitionBaseOps [VD2 , Self ] = {
243+ implicitly[VertexPartitionBaseOpsConstructor [Self ]].toOps(partition)
236244 }
237245}
0 commit comments