Skip to content

Commit 18e6c97

Browse files
committed
java api zipWithIndex test
1 parent 11e2e7f commit 18e6c97

File tree

2 files changed

+39
-10
lines changed

2 files changed

+39
-10
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
268268
* 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
269269
* won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
270270
*/
271-
def zipWithUniqueId():JavaPairRDD[T,Long] = JavaPairRDD.fromRDD(rdd.zipWithUniqueId())
271+
def zipWithUniqueId[Long](): JavaPairRDD[T, Long] = {
272+
JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, Long]]
273+
}
272274

273275
/**
274276
* Zips this RDD with its element indices. The ordering is first based on the partition index
@@ -277,8 +279,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
277279
* This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
278280
* This method needs to trigger a spark job when this RDD contains more than one partitions.
279281
*/
280-
def zipWithIndex():JavaPairRDD[T,Long] = JavaPairRDD.fromRDD(rdd.zipWithIndex())
281-
282+
def zipWithIndex[Long](): JavaPairRDD[T, Long] = {
283+
JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, Long]]
284+
}
282285

283286
// Actions (launch a job to return a value to the user program)
284287

core/src/test/java/org/apache/spark/JavaAPISuite.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,39 @@ public void call(String s) {
182182
Assert.assertEquals(2, foreachCalls);
183183
}
184184

185-
@Test
186-
public void toLocalIterator() {
187-
List<Integer> correct = Arrays.asList(1, 2, 3, 4);
188-
JavaRDD<Integer> rdd = sc.parallelize(correct);
189-
List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
190-
Assert.assertTrue(correct.equals(result));
191-
}
185+
@Test
186+
public void toLocalIterator() {
187+
List<Integer> correct = Arrays.asList(1, 2, 3, 4);
188+
JavaRDD<Integer> rdd = sc.parallelize(correct);
189+
List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
190+
Assert.assertTrue(correct.equals(result));
191+
}
192+
193+
@Test
194+
public void zipWithUniqueId() {
195+
List<Integer> correct = Arrays.asList(1, 2, 3, 4);
196+
JavaPairRDD<Integer, Long> zip = sc.parallelize(correct).zipWithUniqueId();
197+
JavaRDD<Long> indexes = zip.map(new Function<Tuple2<Integer, Long>, Long>() {
198+
@Override
199+
public Long call(Tuple2<Integer, Long> t) throws Exception {
200+
return t._2();
201+
}
202+
});
203+
Assert.assertTrue(new HashSet<Long>(indexes.collect()).size() == 4);
204+
}
205+
206+
@Test
207+
public void zipWithIndex() {
208+
List<Integer> correct = Arrays.asList(1, 2, 3, 4);
209+
JavaPairRDD<Integer, Long> zip = sc.parallelize(correct).zipWithIndex();
210+
JavaRDD<Long> indexes = zip.map(new Function<Tuple2<Integer, Long>, Long>() {
211+
@Override
212+
public Long call(Tuple2<Integer, Long> t) throws Exception {
213+
return t._2();
214+
}
215+
});
216+
Assert.assertTrue(new HashSet<Long>(indexes.collect()).size() == 4);
217+
}
192218

193219
@SuppressWarnings("unchecked")
194220
@Test

0 commit comments

Comments
 (0)