From 346e46ed8789ab72c709bec40c728568fd7294e5 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 10 Oct 2014 19:16:49 -0700 Subject: [PATCH 1/7] [SPARK-3902] Stabilize AsyncRDDActions; add Java API. --- .../spark/api/java/JavaFutureAction.java | 33 ++++++ .../scala/org/apache/spark/FutureAction.scala | 85 +++++++++++--- .../apache/spark/api/java/JavaRDDLike.scala | 55 +++++++-- .../apache/spark/rdd/AsyncRDDActions.scala | 3 - .../java/org/apache/spark/JavaAPISuite.java | 111 +++++++++++++++++- 5 files changed, 254 insertions(+), 33 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/api/java/JavaFutureAction.java diff --git a/core/src/main/java/org/apache/spark/api/java/JavaFutureAction.java b/core/src/main/java/org/apache/spark/api/java/JavaFutureAction.java new file mode 100644 index 0000000000000..0ad189633e427 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/java/JavaFutureAction.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java; + + +import java.util.List; +import java.util.concurrent.Future; + +public interface JavaFutureAction extends Future { + + /** + * Returns the job IDs run by the underlying async operation. + * + * This returns the current snapshot of the job list. Certain operations may run multiple + * jobs, so multiple calls to this method may return different lists. + */ + List jobIds(); +} diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index e8f761eaa5799..005f049c28880 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -17,20 +17,20 @@ package org.apache.spark -import scala.concurrent._ -import scala.concurrent.duration.Duration -import scala.util.Try +import java.util.concurrent.TimeUnit -import org.apache.spark.annotation.Experimental +import org.apache.spark.api.java.JavaFutureAction import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter} +import scala.concurrent._ +import scala.concurrent.duration.Duration +import scala.util.{Failure, Try} + /** - * :: Experimental :: * A future for the result of an action to support cancellation. This is an extension of the * Scala Future interface to support cancellation. */ -@Experimental trait FutureAction[T] extends Future[T] { // Note that we redefine methods of the Future trait here explicitly so we can specify a different // documentation (with reference to the word "action"). @@ -69,6 +69,11 @@ trait FutureAction[T] extends Future[T] { */ override def isCompleted: Boolean + /** + * Returns whether the action has been cancelled. + */ + def isCancelled: Boolean + /** * The value of this Future. * @@ -96,15 +101,16 @@ trait FutureAction[T] extends Future[T] { /** - * :: Experimental :: * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include * count, collect, reduce. */ -@Experimental class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T) extends FutureAction[T] { + @volatile private var _cancelled: Boolean = false + override def cancel() { + _cancelled = true jobWaiter.cancel() } @@ -143,6 +149,8 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } override def isCompleted: Boolean = jobWaiter.jobFinished + + override def isCancelled: Boolean = _cancelled override def value: Option[Try[T]] = { if (jobWaiter.jobFinished) { @@ -164,12 +172,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: /** - * :: Experimental :: * A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take, * takeSample. Cancellation works by setting the cancelled flag to true and interrupting the * action thread if it is being blocked by a job. */ -@Experimental class ComplexFutureAction[T] extends FutureAction[T] { // Pointer to the thread that is executing the action. It is set when the action is run. @@ -222,7 +228,7 @@ class ComplexFutureAction[T] extends FutureAction[T] { // If the action hasn't been cancelled yet, submit the job. The check and the submitJob // command need to be in an atomic block. val job = this.synchronized { - if (!cancelled) { + if (!isCancelled) { rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc) } else { throw new SparkException("Action has been cancelled") @@ -243,10 +249,7 @@ class ComplexFutureAction[T] extends FutureAction[T] { } } - /** - * Returns whether the promise has been cancelled. - */ - def cancelled: Boolean = _cancelled + override def isCancelled: Boolean = _cancelled @throws(classOf[InterruptedException]) @throws(classOf[scala.concurrent.TimeoutException]) @@ -271,3 +274,55 @@ class ComplexFutureAction[T] extends FutureAction[T] { def jobIds = jobs } + +private[spark] +class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S => T) + extends JavaFutureAction[T] { + + import scala.collection.JavaConverters._ + + override def isCancelled: Boolean = futureAction.isCancelled + + override def isDone: Boolean = { + // According to java.util.Future's Javadoc, this returns True if the task was completed, + // whether that completion was due to succesful execution, an exception, or a cancellation. + futureAction.isCancelled || futureAction.isCompleted + } + + override def jobIds(): java.util.List[java.lang.Integer] = { + new java.util.ArrayList(futureAction.jobIds.map(x => new Integer(x)).asJava) + } + + private def getImpl(timeout: Duration): T = { + // This will throw TimeoutException on timeout: + Await.ready(futureAction, timeout) + futureAction.value.get match { + case scala.util.Success(value) => converter(value) + case Failure(exception) => + if (isCancelled) { + throw new CancellationException("Job cancelled: ${exception.message}"); + } else { + // java.util.Future.get() wraps exceptions in ExecutionException + throw new ExecutionException("Exception thrown by job: ", exception) + } + } + } + + override def get(): T = getImpl(Duration.Inf) + + override def get(timeout: Long, unit: TimeUnit): T = + getImpl(Duration.fromNanos(unit.toNanos(timeout))) + + override def cancel(mayInterruptIfRunning: Boolean): Boolean = { + if (isDone) { + // According to java.util.Future's Javadoc, this should return false if the task is completed. + false + } else { + // We're limited in terms of the semantics we can provide here; our cancellation is + // asynchronous and doesn't provide a mechanism to not cancel if the job is running. + futureAction.cancel() + true + } + } + +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 545bc0e9e99ed..807fcd3db4733 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -17,6 +17,7 @@ package org.apache.spark.api.java +import java.util import java.util.{Comparator, List => JList, Iterator => JIterator} import java.lang.{Iterable => JIterable, Long => JLong} @@ -26,7 +27,7 @@ import scala.reflect.ClassTag import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext} +import org.apache.spark._ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag @@ -293,8 +294,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Applies a function f to all elements of this RDD. */ def foreach(f: VoidFunction[T]) { - val cleanF = rdd.context.clean((x: T) => f.call(x)) - rdd.foreach(cleanF) + rdd.foreach(x => f.call(x)) } /** @@ -575,16 +575,49 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def name(): String = rdd.name /** - * :: Experimental :: - * The asynchronous version of the foreach action. - * - * @param f the function to apply to all the elements of the RDD - * @return a FutureAction for the action + * The asynchronous version of `count`, which returns a + * future for counting the number of elements in this RDD. */ - @Experimental - def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = { + def countAsync(): JavaFutureAction[JLong] = { + import org.apache.spark.SparkContext._ + new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), x => new JLong(x)) + } + + /** + * The asynchronous version of `collect`, which returns a future for + * retrieving an array containing all of the elements in this RDD. + */ + def collectAsync(): JavaFutureAction[JList[T]] = { + import org.apache.spark.SparkContext._ + new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => new java.util.ArrayList(x)) + } + + /** + * The asynchronous version of the `take` action, which returns a + * future for retrieving the first `num` elements of this RDD. + */ + def takeAsync(num: Int): JavaFutureAction[JList[T]] = { import org.apache.spark.SparkContext._ - rdd.foreachAsync(x => f.call(x)) + new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => new java.util.ArrayList(x)) } + /** + * The asynchronous version of the `foreach` action, which + * applies a function f to all the elements of this RDD. + */ + def foreachAsync(f: VoidFunction[T]): JavaFutureAction[Void] = { + import org.apache.spark.SparkContext._ + new JavaFutureActionWrapper[Unit, Void](rdd.foreachAsync(x => f.call(x)), + { x => null.asInstanceOf[Void] }) + } + + /** + * The asynchronous version of the `foreachPartition` action, which + * applies a function f to each partition of this RDD. + */ + def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = { + import org.apache.spark.SparkContext._ + new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x)), + { x => null.asInstanceOf[Void] }) + } } diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index b62f3fbdc4a15..c5c739e7f480b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -24,14 +24,11 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.reflect.ClassTag import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} -import org.apache.spark.annotation.Experimental /** - * :: Experimental :: * A set of asynchronous RDD actions available through an implicit conversion. * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions. */ -@Experimental class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging { /** diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 4a078435447e5..8c7dc04cd1068 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -20,7 +20,9 @@ import java.io.*; import java.net.URI; import java.util.*; +import java.util.concurrent.*; +import org.apache.spark.api.java.*; import scala.Tuple2; import scala.Tuple3; import scala.Tuple4; @@ -43,10 +45,6 @@ import org.junit.Before; import org.junit.Test; -import org.apache.spark.api.java.JavaDoubleRDD; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.partial.BoundedDouble; @@ -1308,6 +1306,111 @@ public void collectUnderlyingScalaRDD() { Assert.assertEquals(data.size(), collected.length); } + private static final class IdentityWithDelay implements Function { + + final int delayMillis; + + IdentityWithDelay(int delayMillis) { + this.delayMillis = delayMillis; + } + + @Override + public T call(T x) throws Exception { + Thread.sleep(delayMillis); + return x; + } + } + + private static final class BuggyMapFunction implements Function { + + @Override + public T call(T x) throws Exception { + throw new IllegalStateException("Custom exception!"); + } + } + + @Test + public void collectAsync() throws Exception { + List data = Arrays.asList(1, 2, 3, 4, 5); + JavaRDD rdd = sc.parallelize(data, 1); + JavaFutureAction> future = + rdd.map(new IdentityWithDelay(200)).collectAsync(); + Assert.assertFalse(future.isCancelled()); + Assert.assertFalse(future.isDone()); + List result = future.get(); + Assert.assertEquals(result, data); + Assert.assertFalse(future.isCancelled()); + Assert.assertTrue(future.isDone()); + Assert.assertEquals(future.jobIds().size(), 1); + } + + @Test + public void foreachAsync() throws Exception { + List data = Arrays.asList(1, 2, 3, 4, 5); + JavaRDD rdd = sc.parallelize(data, 1); + JavaFutureAction future = rdd.map(new IdentityWithDelay(200)).foreachAsync( + new VoidFunction() { + @Override + public void call(Integer integer) throws Exception { + // intentionally left blank. + } + } + ); + Assert.assertFalse(future.isCancelled()); + Assert.assertFalse(future.isDone()); + future.get(); + Assert.assertFalse(future.isCancelled()); + Assert.assertTrue(future.isDone()); + Assert.assertEquals(future.jobIds().size(), 1); + } + + @Test + public void countAsync() throws Exception { + List data = Arrays.asList(1, 2, 3, 4, 5); + JavaRDD rdd = sc.parallelize(data, 1); + JavaFutureAction future = rdd.map(new IdentityWithDelay(200)).countAsync(); + Assert.assertFalse(future.isCancelled()); + Assert.assertFalse(future.isDone()); + long count = future.get(); + Assert.assertEquals(count, data.size()); + Assert.assertFalse(future.isCancelled()); + Assert.assertTrue(future.isDone()); + Assert.assertEquals(future.jobIds().size(), 1); + } + + @Test + public void testAsyncActionCancellation() throws Exception { + List data = Arrays.asList(1, 2, 3, 4, 5); + JavaRDD rdd = sc.parallelize(data, 1); + JavaFutureAction future = rdd.map(new IdentityWithDelay(200)).countAsync(); + Thread.sleep(200); + future.cancel(true); + Assert.assertTrue(future.isCancelled()); + Assert.assertTrue(future.isDone()); + try { + long count = future.get(2000, TimeUnit.MILLISECONDS); + Assert.fail("Expected future.get() for cancelled job to throw CancellationException"); + } catch (CancellationException ignored) { + // pass + } + } + + @Test + public void testAsyncActionErrorWrapping() throws Exception { + List data = Arrays.asList(1, 2, 3, 4, 5); + JavaRDD rdd = sc.parallelize(data, 1); + JavaFutureAction future = rdd.map(new BuggyMapFunction()).countAsync(); + Thread.sleep(200); + try { + long count = future.get(2000, TimeUnit.MILLISECONDS); + Assert.fail("Expected future.get() for failed job to throw ExcecutionException"); + } catch (ExecutionException ignored) { + // pass + } + Assert.assertTrue(future.isDone()); + } + + /** * Test for SPARK-3647. This test needs to use the maven-built assembly to trigger the issue, * since that's the only artifact where Guava classes have been relocated. From ff28e49d990577635fa148bd57461a387bd3466d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 10 Oct 2014 22:32:57 -0700 Subject: [PATCH 2/7] Add MiMa excludes and fix a scalastyle error. --- .../main/scala/org/apache/spark/FutureAction.scala | 2 +- project/MimaExcludes.scala | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 005f049c28880..80ce2093cacdd 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -325,4 +325,4 @@ class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S } } -} \ No newline at end of file +} diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index d499302124461..e164dd8d332e0 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -51,6 +51,19 @@ object MimaExcludes { // MapStatus should be private[spark] ProblemFilters.exclude[IncompatibleTemplateDefProblem]( "org.apache.spark.scheduler.MapStatus") + ) ++ + Seq( + // Adding new methods to the JavaRDDLike trait: + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.takeAsync"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.foreachPartitionAsync"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.countAsync"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.foreachAsync"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.collectAsync") ) case v if v.startsWith("1.1") => From 6f8f6ac668d74a3164bcf037f09c8353134b53f6 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 10 Oct 2014 22:47:11 -0700 Subject: [PATCH 3/7] Fix import ordering. --- core/src/test/java/org/apache/spark/JavaAPISuite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 8c7dc04cd1068..107ce5cedaccd 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -22,7 +22,6 @@ import java.util.*; import java.util.concurrent.*; -import org.apache.spark.api.java.*; import scala.Tuple2; import scala.Tuple3; import scala.Tuple4; @@ -45,6 +44,7 @@ import org.junit.Before; import org.junit.Test; +import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.*; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.partial.BoundedDouble; From 7a1417fbb8d87e175850fcb2744dcb31770ef051 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 14 Oct 2014 11:38:34 -0700 Subject: [PATCH 4/7] Removed unnecessary java.util import. --- core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 807fcd3db4733..2726a8c633b78 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -17,7 +17,6 @@ package org.apache.spark.api.java -import java.util import java.util.{Comparator, List => JList, Iterator => JIterator} import java.lang.{Iterable => JIterable, Long => JLong} From e8e28673858ce3c6e6af7bd968039cfb4b221371 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 14 Oct 2014 15:08:21 -0700 Subject: [PATCH 5/7] Updates based on Marcelo's review feedback --- .../scala/org/apache/spark/FutureAction.scala | 11 ++-- .../apache/spark/api/java/JavaRDDLike.scala | 13 ++--- .../java/org/apache/spark/JavaAPISuite.java | 56 +++++++------------ 3 files changed, 30 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 80ce2093cacdd..d5c8f9d76c476 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -17,6 +17,7 @@ package org.apache.spark +import java.util.Collections import java.util.concurrent.TimeUnit import org.apache.spark.api.java.JavaFutureAction @@ -285,12 +286,12 @@ class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S override def isDone: Boolean = { // According to java.util.Future's Javadoc, this returns True if the task was completed, - // whether that completion was due to succesful execution, an exception, or a cancellation. + // whether that completion was due to successful execution, an exception, or a cancellation. futureAction.isCancelled || futureAction.isCompleted } override def jobIds(): java.util.List[java.lang.Integer] = { - new java.util.ArrayList(futureAction.jobIds.map(x => new Integer(x)).asJava) + Collections.unmodifiableList(futureAction.jobIds.map(Integer.valueOf).asJava) } private def getImpl(timeout: Duration): T = { @@ -300,10 +301,10 @@ class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S case scala.util.Success(value) => converter(value) case Failure(exception) => if (isCancelled) { - throw new CancellationException("Job cancelled: ${exception.message}"); + throw new CancellationException("Job cancelled").initCause(exception) } else { // java.util.Future.get() wraps exceptions in ExecutionException - throw new ExecutionException("Exception thrown by job: ", exception) + throw new ExecutionException("Exception thrown by job", exception) } } } @@ -313,7 +314,7 @@ class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S override def get(timeout: Long, unit: TimeUnit): T = getImpl(Duration.fromNanos(unit.toNanos(timeout))) - override def cancel(mayInterruptIfRunning: Boolean): Boolean = { + override def cancel(mayInterruptIfRunning: Boolean): Boolean = synchronized { if (isDone) { // According to java.util.Future's Javadoc, this should return false if the task is completed. false diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 2726a8c633b78..0ea762b671cb2 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -21,12 +21,14 @@ import java.util.{Comparator, List => JList, Iterator => JIterator} import java.lang.{Iterable => JIterable, Long => JLong} import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec import org.apache.spark._ +import org.apache.spark.SparkContext._ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag @@ -578,8 +580,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * future for counting the number of elements in this RDD. */ def countAsync(): JavaFutureAction[JLong] = { - import org.apache.spark.SparkContext._ - new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), x => new JLong(x)) + new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), JLong.valueOf) } /** @@ -587,8 +588,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * retrieving an array containing all of the elements in this RDD. */ def collectAsync(): JavaFutureAction[JList[T]] = { - import org.apache.spark.SparkContext._ - new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => new java.util.ArrayList(x)) + new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => x.asJava) } /** @@ -596,8 +596,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * future for retrieving the first `num` elements of this RDD. */ def takeAsync(num: Int): JavaFutureAction[JList[T]] = { - import org.apache.spark.SparkContext._ - new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => new java.util.ArrayList(x)) + new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => x.asJava) } /** @@ -605,7 +604,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * applies a function f to all the elements of this RDD. */ def foreachAsync(f: VoidFunction[T]): JavaFutureAction[Void] = { - import org.apache.spark.SparkContext._ new JavaFutureActionWrapper[Unit, Void](rdd.foreachAsync(x => f.call(x)), { x => null.asInstanceOf[Void] }) } @@ -615,7 +613,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * applies a function f to each partition of this RDD. */ def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = { - import org.apache.spark.SparkContext._ new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x)), { x => null.asInstanceOf[Void] }) } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 107ce5cedaccd..256b3e4fe523e 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -30,6 +30,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.base.Throwables; import com.google.common.base.Optional; import com.google.common.base.Charsets; import com.google.common.io.Files; @@ -1306,21 +1307,6 @@ public void collectUnderlyingScalaRDD() { Assert.assertEquals(data.size(), collected.length); } - private static final class IdentityWithDelay implements Function { - - final int delayMillis; - - IdentityWithDelay(int delayMillis) { - this.delayMillis = delayMillis; - } - - @Override - public T call(T x) throws Exception { - Thread.sleep(delayMillis); - return x; - } - } - private static final class BuggyMapFunction implements Function { @Override @@ -1333,22 +1319,19 @@ public T call(T x) throws Exception { public void collectAsync() throws Exception { List data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD rdd = sc.parallelize(data, 1); - JavaFutureAction> future = - rdd.map(new IdentityWithDelay(200)).collectAsync(); - Assert.assertFalse(future.isCancelled()); - Assert.assertFalse(future.isDone()); + JavaFutureAction> future = rdd.collectAsync(); List result = future.get(); - Assert.assertEquals(result, data); + Assert.assertEquals(data, result); Assert.assertFalse(future.isCancelled()); Assert.assertTrue(future.isDone()); - Assert.assertEquals(future.jobIds().size(), 1); + Assert.assertEquals(1, future.jobIds().size()); } @Test public void foreachAsync() throws Exception { List data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD rdd = sc.parallelize(data, 1); - JavaFutureAction future = rdd.map(new IdentityWithDelay(200)).foreachAsync( + JavaFutureAction future = rdd.foreachAsync( new VoidFunction() { @Override public void call(Integer integer) throws Exception { @@ -1356,39 +1339,39 @@ public void call(Integer integer) throws Exception { } } ); - Assert.assertFalse(future.isCancelled()); - Assert.assertFalse(future.isDone()); future.get(); Assert.assertFalse(future.isCancelled()); Assert.assertTrue(future.isDone()); - Assert.assertEquals(future.jobIds().size(), 1); + Assert.assertEquals(1, future.jobIds().size()); } @Test public void countAsync() throws Exception { List data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD rdd = sc.parallelize(data, 1); - JavaFutureAction future = rdd.map(new IdentityWithDelay(200)).countAsync(); - Assert.assertFalse(future.isCancelled()); - Assert.assertFalse(future.isDone()); + JavaFutureAction future = rdd.countAsync(); long count = future.get(); - Assert.assertEquals(count, data.size()); + Assert.assertEquals(data.size(), count); Assert.assertFalse(future.isCancelled()); Assert.assertTrue(future.isDone()); - Assert.assertEquals(future.jobIds().size(), 1); + Assert.assertEquals(1, future.jobIds().size()); } @Test public void testAsyncActionCancellation() throws Exception { List data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD rdd = sc.parallelize(data, 1); - JavaFutureAction future = rdd.map(new IdentityWithDelay(200)).countAsync(); - Thread.sleep(200); + JavaFutureAction future = rdd.foreachAsync(new VoidFunction() { + @Override + public void call(Integer integer) throws Exception { + Thread.sleep(10000); // To ensure that the job won't finish before it's cancelled. + } + }); future.cancel(true); Assert.assertTrue(future.isCancelled()); Assert.assertTrue(future.isDone()); try { - long count = future.get(2000, TimeUnit.MILLISECONDS); + future.get(2000, TimeUnit.MILLISECONDS); Assert.fail("Expected future.get() for cancelled job to throw CancellationException"); } catch (CancellationException ignored) { // pass @@ -1400,12 +1383,11 @@ public void testAsyncActionErrorWrapping() throws Exception { List data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD rdd = sc.parallelize(data, 1); JavaFutureAction future = rdd.map(new BuggyMapFunction()).countAsync(); - Thread.sleep(200); try { - long count = future.get(2000, TimeUnit.MILLISECONDS); + long count = future.get(2, TimeUnit.SECONDS); Assert.fail("Expected future.get() for failed job to throw ExcecutionException"); - } catch (ExecutionException ignored) { - // pass + } catch (ExecutionException ee) { + Assert.assertTrue(Throwables.getStackTraceAsString(ee).contains("Custom exception!")); } Assert.assertTrue(future.isDone()); } From c0153a59b72bb37eaa15dff57ad75121bd27bfe4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 14 Oct 2014 17:15:32 -0700 Subject: [PATCH 6/7] Remove unused variable. --- core/src/test/java/org/apache/spark/JavaAPISuite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 256b3e4fe523e..687d410709d94 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1384,7 +1384,7 @@ public void testAsyncActionErrorWrapping() throws Exception { JavaRDD rdd = sc.parallelize(data, 1); JavaFutureAction future = rdd.map(new BuggyMapFunction()).countAsync(); try { - long count = future.get(2, TimeUnit.SECONDS); + future.get(2, TimeUnit.SECONDS); Assert.fail("Expected future.get() for failed job to throw ExcecutionException"); } catch (ExecutionException ee) { Assert.assertTrue(Throwables.getStackTraceAsString(ee).contains("Custom exception!")); From 0d45fbc9e41c8dc2fffd58a0a48c19a6d9dafdd8 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 19 Oct 2014 00:19:35 -0700 Subject: [PATCH 7/7] Whitespace fix. --- project/MimaExcludes.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 383a95dc737cf..c58666af84f24 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -67,7 +67,7 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaRDDLike.collectAsync") ) - + case v if v.startsWith("1.1") => Seq( MimaBuild.excludeSparkPackage("deploy"),