From 6e2b87b4bb414f69753cc5208099baff8d54f002 Mon Sep 17 00:00:00 2001 From: lirui Date: Wed, 27 Aug 2014 13:02:01 +0800 Subject: [PATCH 01/12] SPARK-2636: add java API for async actions --- .../scala/org/apache/spark/api/java/JavaRDDLike.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index f917cfd1419ec..c2eacbd1d090c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext} import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag @@ -574,4 +574,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def name(): String = rdd.name + def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = { + val cleanF = rdd.context.clean((x: T) => f.call(x)) + import org.apache.spark.SparkContext._ + rdd.foreachAsync(cleanF) + } + } From eb1ee798a41f89bc996bc1661968b7cf75c48d6e Mon Sep 17 00:00:00 2001 From: lirui Date: Wed, 27 Aug 2014 14:51:56 +0800 Subject: [PATCH 02/12] SPARK-2636: change some parameters in SimpleFutureAction to member field --- core/src/main/scala/org/apache/spark/FutureAction.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 1e4dec86a0530..8ea8989a44396 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -92,7 +92,7 @@ trait FutureAction[T] extends Future[T] { * count, collect, reduce. */ @Experimental -class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T) +class SimpleFutureAction[T] private[spark](val jobWaiter: JobWaiter[_], val resultFunc: => T) extends FutureAction[T] { override def cancel() { From d09f73216005bef498f4086d40860a8bdd587940 Mon Sep 17 00:00:00 2001 From: lirui Date: Wed, 27 Aug 2014 14:57:11 +0800 Subject: [PATCH 03/12] SPARK-2636: fix build --- core/src/main/scala/org/apache/spark/FutureAction.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 8ea8989a44396..2af0d69294251 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -92,7 +92,7 @@ trait FutureAction[T] extends Future[T] { * count, collect, reduce. */ @Experimental -class SimpleFutureAction[T] private[spark](val jobWaiter: JobWaiter[_], val resultFunc: => T) +class SimpleFutureAction[T] private[spark](val jobWaiter: JobWaiter[_], resultFunc: => T) extends FutureAction[T] { override def cancel() { From 1b25abc86516f1e2cbbab9bdb03dffa920bb8658 Mon Sep 17 00:00:00 2001 From: lirui Date: Wed, 27 Aug 2014 15:16:38 +0800 Subject: [PATCH 04/12] SPARK-2636: expose some fields in JobWaiter --- .../src/main/scala/org/apache/spark/scheduler/JobWaiter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index e9bfee2248e5b..c0441f00f0cd3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -23,8 +23,8 @@ package org.apache.spark.scheduler */ private[spark] class JobWaiter[T]( dagScheduler: DAGScheduler, - jobId: Int, - totalTasks: Int, + val jobId: Int, + val totalTasks: Int, resultHandler: (Int, T) => Unit) extends JobListener { From fbf574443ffe63f8d449fd639093016cb064283d Mon Sep 17 00:00:00 2001 From: lirui Date: Wed, 27 Aug 2014 22:35:02 +0800 Subject: [PATCH 05/12] SPARK-2636: add more async actions for java api --- .../apache/spark/api/java/JavaRDDLike.scala | 60 ++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index c2eacbd1d090c..5c9bcda160268 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -21,16 +21,18 @@ import java.util.{Comparator, List => JList, Iterator => JIterator} import java.lang.{Iterable => JIterable, Long => JLong} import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext} +import org.apache.spark._ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, _} +import scala.concurrent.ExecutionContext.Implicits.global import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -580,4 +582,60 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { rdd.foreachAsync(cleanF) } + def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): FutureAction[Unit] = { + import org.apache.spark.SparkContext._ + rdd.foreachPartitionAsync(x => f.call(asJavaIterator(x))) + } + + def countAsync(): FutureAction[Long] = { + import org.apache.spark.SparkContext._ + rdd.countAsync() + } + + def collectAsync(): FutureAction[JList[T]] = { + val results = new Array[Array[T]](rdd.partitions.size) + rdd.context.submitJob[T, Array[T], JList[T]](rdd, _.toArray, Range(0, rdd.partitions.size), + (index, data) => results(index) = data, new java.util.ArrayList[T](results.flatten.toSeq)) + } + + def takeAsync(num: Int): FutureAction[JList[T]] = { + val f = new ComplexFutureAction[JList[T]] + f.run { + val results = new ArrayBuffer[T](num) + val totalParts = rdd.partitions.length + var partsScanned = 0 + while (results.size < num && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1 + if (partsScanned > 0) { + // If we didn't find any rows after the first iteration, just try all partitions next. + // Otherwise, interpolate the number of partitions we need to try, but overestimate it + // by 50%. + if (results.size == 0) { + numPartsToTry = totalParts - 1 + } else { + numPartsToTry = (1.5 * num * partsScanned / results.size).toInt + } + } + numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions + + val left = num - results.size + val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) + + val buf = new Array[Array[T]](p.size) + f.runJob(rdd, + (it: Iterator[T]) => it.take(left).toArray, + p, + (index: Int, data: Array[T]) => buf(index) = data, + Unit) + + buf.foreach(results ++= _.take(num - results.size)) + partsScanned += numPartsToTry + } + new java.util.ArrayList[T](results.toSeq) + } + f + } + } From 843276c6996a900656e8da37d6dcf38feee93cc0 Mon Sep 17 00:00:00 2001 From: lirui Date: Thu, 28 Aug 2014 15:22:32 +0800 Subject: [PATCH 06/12] SPARK-2636: only keep foreachAsync in the java API --- .../scala/org/apache/spark/FutureAction.scala | 9 ++- .../apache/spark/api/java/JavaRDDLike.scala | 56 ------------------- .../apache/spark/scheduler/JobWaiter.scala | 2 +- 3 files changed, 9 insertions(+), 58 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 2af0d69294251..580922d3eb404 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -92,7 +92,7 @@ trait FutureAction[T] extends Future[T] { * count, collect, reduce. */ @Experimental -class SimpleFutureAction[T] private[spark](val jobWaiter: JobWaiter[_], resultFunc: => T) +class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T) extends FutureAction[T] { override def cancel() { @@ -149,6 +149,13 @@ class SimpleFutureAction[T] private[spark](val jobWaiter: JobWaiter[_], resultFu case JobFailed(e: Exception) => scala.util.Failure(e) } } + + /** + * Get the corresponding job Id for this action + */ + def jobId(): Int = { + jobWaiter.jobId + } } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 5c9bcda160268..f82ede2b0179e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -582,60 +582,4 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { rdd.foreachAsync(cleanF) } - def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): FutureAction[Unit] = { - import org.apache.spark.SparkContext._ - rdd.foreachPartitionAsync(x => f.call(asJavaIterator(x))) - } - - def countAsync(): FutureAction[Long] = { - import org.apache.spark.SparkContext._ - rdd.countAsync() - } - - def collectAsync(): FutureAction[JList[T]] = { - val results = new Array[Array[T]](rdd.partitions.size) - rdd.context.submitJob[T, Array[T], JList[T]](rdd, _.toArray, Range(0, rdd.partitions.size), - (index, data) => results(index) = data, new java.util.ArrayList[T](results.flatten.toSeq)) - } - - def takeAsync(num: Int): FutureAction[JList[T]] = { - val f = new ComplexFutureAction[JList[T]] - f.run { - val results = new ArrayBuffer[T](num) - val totalParts = rdd.partitions.length - var partsScanned = 0 - while (results.size < num && partsScanned < totalParts) { - // The number of partitions to try in this iteration. It is ok for this number to be - // greater than totalParts because we actually cap it at totalParts in runJob. - var numPartsToTry = 1 - if (partsScanned > 0) { - // If we didn't find any rows after the first iteration, just try all partitions next. - // Otherwise, interpolate the number of partitions we need to try, but overestimate it - // by 50%. - if (results.size == 0) { - numPartsToTry = totalParts - 1 - } else { - numPartsToTry = (1.5 * num * partsScanned / results.size).toInt - } - } - numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions - - val left = num - results.size - val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) - - val buf = new Array[Array[T]](p.size) - f.runJob(rdd, - (it: Iterator[T]) => it.take(left).toArray, - p, - (index: Int, data: Array[T]) => buf(index) = data, - Unit) - - buf.foreach(results ++= _.take(num - results.size)) - partsScanned += numPartsToTry - } - new java.util.ArrayList[T](results.toSeq) - } - f - } - } diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index c0441f00f0cd3..29879b374b801 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -24,7 +24,7 @@ package org.apache.spark.scheduler private[spark] class JobWaiter[T]( dagScheduler: DAGScheduler, val jobId: Int, - val totalTasks: Int, + totalTasks: Int, resultHandler: (Int, T) => Unit) extends JobListener { From af4f5d98825a4622ac9ad7d076f59be5ceb7328f Mon Sep 17 00:00:00 2001 From: lirui Date: Thu, 28 Aug 2014 15:26:03 +0800 Subject: [PATCH 07/12] SPARK-2636: remove unused imports --- .../main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index f82ede2b0179e..c2eacbd1d090c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -21,18 +21,16 @@ import java.util.{Comparator, List => JList, Iterator => JIterator} import java.lang.{Iterable => JIterable, Long => JLong} import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.spark._ +import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext} import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, _} -import scala.concurrent.ExecutionContext.Implicits.global import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel From 3fa39f70c56cfd471c2ff2f7a92d2af3db3fb827 Mon Sep 17 00:00:00 2001 From: lirui Date: Fri, 29 Aug 2014 14:36:18 +0800 Subject: [PATCH 08/12] SPARK-2636: refine the patch --- core/src/main/scala/org/apache/spark/FutureAction.scala | 2 +- .../scala/org/apache/spark/api/java/JavaRDDLike.scala | 9 +++++++-- .../scala/org/apache/spark/rdd/AsyncRDDActions.scala | 3 ++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 580922d3eb404..4da53e8e0f8f0 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -153,7 +153,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: /** * Get the corresponding job Id for this action */ - def jobId(): Int = { + def id: Int = { jobWaiter.jobId } } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index c2eacbd1d090c..7462574683a83 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -574,10 +574,15 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def name(): String = rdd.name + /** + * The asynchronous version of the foreach action. + * + * @param f the function to apply to all the elements of the RDD + * @return a FutureAction for the action + */ def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = { - val cleanF = rdd.context.clean((x: T) => f.call(x)) import org.apache.spark.SparkContext._ - rdd.foreachAsync(cleanF) + rdd.foreachAsync(x => f.call(x)) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index aed951a40b40c..b62f3fbdc4a15 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -112,7 +112,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi * Applies a function f to all elements of this RDD. */ def foreachAsync(f: T => Unit): FutureAction[Unit] = { - self.context.submitJob[T, Unit, Unit](self, _.foreach(f), Range(0, self.partitions.size), + val cleanF = self.context.clean(f) + self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size), (index, data) => Unit, Unit) } From 0ca320dc3f67b4a8d3be0bee44ccae1434c5d596 Mon Sep 17 00:00:00 2001 From: lirui Date: Fri, 29 Aug 2014 14:48:42 +0800 Subject: [PATCH 09/12] SPARK-2636: fix method name & javadoc --- core/src/main/scala/org/apache/spark/FutureAction.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 4da53e8e0f8f0..75ea535f2f57b 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -150,12 +150,8 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } } - /** - * Get the corresponding job Id for this action - */ - def id: Int = { - jobWaiter.jobId - } + /** Get the corresponding job id for this action. */ + def jobId = jobWaiter.jobId } From e2e01d55d72d847c994c22b7d143526cbe44285d Mon Sep 17 00:00:00 2001 From: lirui Date: Fri, 29 Aug 2014 17:08:11 +0800 Subject: [PATCH 10/12] SPARK-2636: add mima exclude --- project/MimaExcludes.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 300589394b96f..2ddb1644a3963 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -41,6 +41,9 @@ object MimaExcludes { Seq( // Adding new method to JavaRDLike trait - we should probably mark this as a developer API. ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"), + // Should probably mark this as Experimental + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.foreachAsync"), // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values // for countApproxDistinct* functions, which does not work in Java. We later removed // them, and use the following to tell Mima to not care about them. From 5536d55148c8c4cbcded910e5687ebb1b0c551d1 Mon Sep 17 00:00:00 2001 From: lirui Date: Sat, 30 Aug 2014 22:30:20 +0800 Subject: [PATCH 11/12] SPARK-2636: mark the async API as experimental --- core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 7462574683a83..43ad092a32671 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -575,6 +575,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def name(): String = rdd.name /** + * :: Experimental :: + * THIS IS AN EXPERIMENTAL API THAT MIGHT CHANGE IN THE FUTURE. * The asynchronous version of the foreach action. * * @param f the function to apply to all the elements of the RDD From ccaafb7b2c7c303091f3c2129235fd60bd875049 Mon Sep 17 00:00:00 2001 From: lirui Date: Mon, 1 Sep 2014 09:22:20 +0800 Subject: [PATCH 12/12] SPARK-2636: fix java doc --- core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 43ad092a32671..545bc0e9e99ed 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -576,12 +576,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * :: Experimental :: - * THIS IS AN EXPERIMENTAL API THAT MIGHT CHANGE IN THE FUTURE. * The asynchronous version of the foreach action. * * @param f the function to apply to all the elements of the RDD * @return a FutureAction for the action */ + @Experimental def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = { import org.apache.spark.SparkContext._ rdd.foreachAsync(x => f.call(x))