From a72b75161dd4ea5ee71ce59c09cf79ac717816a9 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sun, 28 Sep 2014 12:33:46 -0700 Subject: [PATCH 1/3] Force test run --- sql/core/runTests | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 sql/core/runTests diff --git a/sql/core/runTests b/sql/core/runTests new file mode 100644 index 000000000000..e69de29bb2d1 From c85c740a67d0de4da449f05dc0a091a97746e3b3 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sun, 28 Sep 2014 14:43:07 -0700 Subject: [PATCH 2/3] Revert "[SPARK-1021] Defer the data-driven computation of partition bounds in so..." This reverts commit 2d972fd84ac54a89e416442508a6d4eaeff452c1. --- .../scala/org/apache/spark/FutureAction.scala | 7 +- .../scala/org/apache/spark/Partitioner.scala | 29 ++------- .../apache/spark/rdd/AsyncRDDActions.scala | 64 ++++++++----------- 3 files changed, 34 insertions(+), 66 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index c277c3a47d42..75ea535f2f57 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -208,7 +208,7 @@ class ComplexFutureAction[T] extends FutureAction[T] { processPartition: Iterator[T] => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit, - resultFunc: => R): R = { + resultFunc: => R) { // If the action hasn't been cancelled yet, submit the job. The check and the submitJob // command need to be in an atomic block. val job = this.synchronized { @@ -223,10 +223,7 @@ class ComplexFutureAction[T] extends FutureAction[T] { // cancel the job and stop the execution. This is not in a synchronized block because // Await.ready eventually waits on the monitor in FutureJob.jobWaiter. try { - Await.ready(job, Duration.Inf).value.get match { - case scala.util.Failure(e) => throw e - case scala.util.Success(v) => v - } + Await.ready(job, Duration.Inf) } catch { case e: InterruptedException => job.cancel() diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index d40b152d221c..37053bb6f37a 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -29,10 +29,6 @@ import org.apache.spark.serializer.JavaSerializer import org.apache.spark.util.{CollectionsUtils, Utils} import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils} -import org.apache.spark.SparkContext.rddToAsyncRDDActions -import scala.concurrent.Await -import scala.concurrent.duration.Duration - /** * An object that defines how the elements in a key-value pair RDD are partitioned by key. * Maps each key to a partition ID, from 0 to `numPartitions - 1`. @@ -117,12 +113,8 @@ class RangePartitioner[K : Ordering : ClassTag, V]( private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions - @volatile private var valRB: Array[K] = null - - private def rangeBounds: Array[K] = this.synchronized { - if (valRB != null) return valRB - - valRB = if (partitions <= 1) { + private var rangeBounds: Array[K] = { + if (partitions <= 1) { Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. @@ -160,8 +152,6 @@ class RangePartitioner[K : Ordering : ClassTag, V]( RangePartitioner.determineBounds(candidates, partitions) } } - - valRB } def numPartitions = rangeBounds.length + 1 @@ -232,8 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V]( } @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream): Unit = this.synchronized { - if (valRB != null) return + private def readObject(in: ObjectInputStream) { val sfactory = SparkEnv.get.serializer sfactory match { case js: JavaSerializer => in.defaultReadObject() @@ -245,7 +234,7 @@ class RangePartitioner[K : Ordering : ClassTag, V]( val ser = sfactory.newInstance() Utils.deserializeViaNestedStream(in, ser) { ds => implicit val classTag = ds.readObject[ClassTag[Array[K]]]() - valRB = ds.readObject[Array[K]]() + rangeBounds = ds.readObject[Array[K]]() } } } @@ -265,18 +254,12 @@ private[spark] object RangePartitioner { sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = { val shift = rdd.id // val classTagK = classTag[K] // to avoid serializing the entire partitioner object - // use collectAsync here to run this job as a future, which is cancellable - val sketchFuture = rdd.mapPartitionsWithIndex { (idx, iter) => + val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift << 16)) val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) - }.collectAsync() - // We do need the future's value to continue any further - val sketched = Await.ready(sketchFuture, Duration.Inf).value.get match { - case scala.util.Success(v) => v.toArray - case scala.util.Failure(e) => throw e - } + }.collect() val numItems = sketched.map(_._2.toLong).sum (numItems, sketched) } diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 7a68b3afa815..b62f3fbdc4a1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -23,7 +23,6 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext.Implicits.global import scala.reflect.ClassTag -import org.apache.spark.util.Utils import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} import org.apache.spark.annotation.Experimental @@ -39,30 +38,29 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi * Returns a future for counting the number of elements in the RDD. */ def countAsync(): FutureAction[Long] = { - val f = new ComplexFutureAction[Long] - f.run { - val totalCount = new AtomicLong - f.runJob(self, - (iter: Iterator[T]) => Utils.getIteratorSize(iter), - Range(0, self.partitions.size), - (index: Int, data: Long) => totalCount.addAndGet(data), - totalCount.get()) - } + val totalCount = new AtomicLong + self.context.submitJob( + self, + (iter: Iterator[T]) => { + var result = 0L + while (iter.hasNext) { + result += 1L + iter.next() + } + result + }, + Range(0, self.partitions.size), + (index: Int, data: Long) => totalCount.addAndGet(data), + totalCount.get()) } /** * Returns a future for retrieving all elements of this RDD. */ def collectAsync(): FutureAction[Seq[T]] = { - val f = new ComplexFutureAction[Seq[T]] - f.run { - val results = new Array[Array[T]](self.partitions.size) - f.runJob(self, - (iter: Iterator[T]) => iter.toArray, - Range(0, self.partitions.size), - (index: Int, data: Array[T]) => results(index) = data, - results.flatten.toSeq) - } + val results = new Array[Array[T]](self.partitions.size) + self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size), + (index, data) => results(index) = data, results.flatten.toSeq) } /** @@ -106,34 +104,24 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi } results.toSeq } + + f } /** * Applies a function f to all elements of this RDD. */ - def foreachAsync(expr: T => Unit): FutureAction[Unit] = { - val f = new ComplexFutureAction[Unit] - val exprClean = self.context.clean(expr) - f.run { - f.runJob(self, - (iter: Iterator[T]) => iter.foreach(exprClean), - Range(0, self.partitions.size), - (index: Int, data: Unit) => Unit, - Unit) - } + def foreachAsync(f: T => Unit): FutureAction[Unit] = { + val cleanF = self.context.clean(f) + self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size), + (index, data) => Unit, Unit) } /** * Applies a function f to each partition of this RDD. */ - def foreachPartitionAsync(expr: Iterator[T] => Unit): FutureAction[Unit] = { - val f = new ComplexFutureAction[Unit] - f.run { - f.runJob(self, - expr, - Range(0, self.partitions.size), - (index: Int, data: Unit) => Unit, - Unit) - } + def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = { + self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.size), + (index, data) => Unit, Unit) } } From c15a6d379d8ec44f3bb08f6beb3e65a45714e57c Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sun, 28 Sep 2014 15:27:15 -0700 Subject: [PATCH 3/3] Remove failing test: udf_xpath_double --- .../apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 556c984ad392..c7b7dad2f65c 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -850,7 +850,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udf_when", "udf_xpath", "udf_xpath_boolean", - "udf_xpath_double", "udf_xpath_float", "udf_xpath_int", "udf_xpath_long",