From 11e2e7f523668e042a1847fa97d92f896a5bc75b Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 16 Apr 2014 12:55:57 +0800 Subject: [PATCH 1/5] add zipWithIndex zipWithUniqueId methods to java api --- .../org/apache/spark/api/java/JavaRDDLike.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 725c423a53e35..bddecd24493ed 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -263,6 +263,23 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V]) } + /** + * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k, + * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method + * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]]. + */ + def zipWithUniqueId():JavaPairRDD[T,Long] = JavaPairRDD.fromRDD(rdd.zipWithUniqueId()) + + /** + * Zips this RDD with its element indices. The ordering is first based on the partition index + * and then the ordering of items within each partition. So the first item in the first + * partition gets index 0, and the last item in the last partition receives the largest index. + * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. + * This method needs to trigger a spark job when this RDD contains more than one partitions. + */ + def zipWithIndex():JavaPairRDD[T,Long] = JavaPairRDD.fromRDD(rdd.zipWithIndex()) + + // Actions (launch a job to return a value to the user program) /** From 18e6c97f51da5027f95ce9757c41263886a8233c Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 16 Apr 2014 15:35:42 +0800 Subject: [PATCH 2/5] java api zipWithIndex test --- .../apache/spark/api/java/JavaRDDLike.scala | 9 +++-- .../java/org/apache/spark/JavaAPISuite.java | 40 +++++++++++++++---- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index bddecd24493ed..1ee941c061723 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -268,7 +268,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]]. */ - def zipWithUniqueId():JavaPairRDD[T,Long] = JavaPairRDD.fromRDD(rdd.zipWithUniqueId()) + def zipWithUniqueId[Long](): JavaPairRDD[T, Long] = { + JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, Long]] + } /** * Zips this RDD with its element indices. The ordering is first based on the partition index @@ -277,8 +279,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. * This method needs to trigger a spark job when this RDD contains more than one partitions. */ - def zipWithIndex():JavaPairRDD[T,Long] = JavaPairRDD.fromRDD(rdd.zipWithIndex()) - + def zipWithIndex[Long](): JavaPairRDD[T, Long] = { + JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, Long]] + } // Actions (launch a job to return a value to the user program) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 8d2e9f1846343..18b74ba263fbd 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -182,13 +182,39 @@ public void call(String s) { Assert.assertEquals(2, foreachCalls); } - @Test - public void toLocalIterator() { - List correct = Arrays.asList(1, 2, 3, 4); - JavaRDD rdd = sc.parallelize(correct); - List result = Lists.newArrayList(rdd.toLocalIterator()); - Assert.assertTrue(correct.equals(result)); - } + @Test + public void toLocalIterator() { + List correct = Arrays.asList(1, 2, 3, 4); + JavaRDD rdd = sc.parallelize(correct); + List result = Lists.newArrayList(rdd.toLocalIterator()); + Assert.assertTrue(correct.equals(result)); + } + + @Test + public void zipWithUniqueId() { + List correct = Arrays.asList(1, 2, 3, 4); + JavaPairRDD zip = sc.parallelize(correct).zipWithUniqueId(); + JavaRDD indexes = zip.map(new Function, Long>() { + @Override + public Long call(Tuple2 t) throws Exception { + return t._2(); + } + }); + Assert.assertTrue(new HashSet(indexes.collect()).size() == 4); + } + + @Test + public void zipWithIndex() { + List correct = Arrays.asList(1, 2, 3, 4); + JavaPairRDD zip = sc.parallelize(correct).zipWithIndex(); + JavaRDD indexes = zip.map(new Function, Long>() { + @Override + public Long call(Tuple2 t) throws Exception { + return t._2(); + } + }); + Assert.assertTrue(new HashSet(indexes.collect()).size() == 4); + } @SuppressWarnings("unchecked") @Test From daa8f84f32701049aa85699780e9ebcc440fed5c Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 23 Apr 2014 10:49:27 +0800 Subject: [PATCH 3/5] review commit --- .../java/org/apache/spark/JavaAPISuite.java | 25 ++++++------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 18b74ba263fbd..415dac4ba7b4e 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -192,28 +192,19 @@ public void toLocalIterator() { @Test public void zipWithUniqueId() { - List correct = Arrays.asList(1, 2, 3, 4); - JavaPairRDD zip = sc.parallelize(correct).zipWithUniqueId(); - JavaRDD indexes = zip.map(new Function, Long>() { - @Override - public Long call(Tuple2 t) throws Exception { - return t._2(); - } - }); + List dataArray = Arrays.asList(1, 2, 3, 4); + JavaPairRDD zip = sc.parallelize(dataArray).zipWithUniqueId(); + JavaRDD indexes = zip.values(); Assert.assertTrue(new HashSet(indexes.collect()).size() == 4); } @Test public void zipWithIndex() { - List correct = Arrays.asList(1, 2, 3, 4); - JavaPairRDD zip = sc.parallelize(correct).zipWithIndex(); - JavaRDD indexes = zip.map(new Function, Long>() { - @Override - public Long call(Tuple2 t) throws Exception { - return t._2(); - } - }); - Assert.assertTrue(new HashSet(indexes.collect()).size() == 4); + List dataArray = Arrays.asList(1, 2, 3, 4); + JavaPairRDD zip = sc.parallelize(dataArray).zipWithIndex(); + JavaRDD indexes = zip.values(); + HashSet correctIndexes = new HashSet(Arrays.asList(0l, 1l, 2l, 3l)); + Assert.assertTrue(new HashSet(indexes.collect()) == correctIndexes); } @SuppressWarnings("unchecked") From 59747d1b8e5eeaf1dc8cb9ae11dd5fd4ddfef68a Mon Sep 17 00:00:00 2001 From: witgo Date: Thu, 24 Apr 2014 10:16:46 +0800 Subject: [PATCH 4/5] review commit --- core/src/test/java/org/apache/spark/JavaAPISuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 415dac4ba7b4e..27e8e268ce485 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -203,8 +203,8 @@ public void zipWithIndex() { List dataArray = Arrays.asList(1, 2, 3, 4); JavaPairRDD zip = sc.parallelize(dataArray).zipWithIndex(); JavaRDD indexes = zip.values(); - HashSet correctIndexes = new HashSet(Arrays.asList(0l, 1l, 2l, 3l)); - Assert.assertTrue(new HashSet(indexes.collect()) == correctIndexes); + List correctIndexes = Arrays.asList(0L, 1L, 2L, 3L); + Assert.assertTrue(indexes.collect().equals(correctIndexes)); } @SuppressWarnings("unchecked") From 24d74c9f4ca06fa54384111f7da213fab7e6f95e Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 29 Apr 2014 15:26:57 +0800 Subject: [PATCH 5/5] review commit --- .../scala/org/apache/spark/api/java/JavaRDDLike.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 1ee941c061723..f3022a3513038 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -18,7 +18,7 @@ package org.apache.spark.api.java import java.util.{Comparator, List => JList, Iterator => JIterator} -import java.lang.{Iterable => JIterable} +import java.lang.{Iterable => JIterable, Long => JLong} import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -268,8 +268,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]]. */ - def zipWithUniqueId[Long](): JavaPairRDD[T, Long] = { - JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, Long]] + def zipWithUniqueId(): JavaPairRDD[T, JLong] = { + JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, JLong]] } /** @@ -279,8 +279,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. * This method needs to trigger a spark job when this RDD contains more than one partitions. */ - def zipWithIndex[Long](): JavaPairRDD[T, Long] = { - JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, Long]] + def zipWithIndex(): JavaPairRDD[T, JLong] = { + JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, JLong]] } // Actions (launch a job to return a value to the user program)