Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
processPartition: Iterator[T] => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit,
resultFunc: => R): R = {
resultFunc: => R) {
// If the action hasn't been cancelled yet, submit the job. The check and the submitJob
// command need to be in an atomic block.
val job = this.synchronized {
Expand All @@ -223,10 +223,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
// cancel the job and stop the execution. This is not in a synchronized block because
// Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
try {
Await.ready(job, Duration.Inf).value.get match {
case scala.util.Failure(e) => throw e
case scala.util.Success(v) => v
}
Await.ready(job, Duration.Inf)
} catch {
case e: InterruptedException =>
job.cancel()
Expand Down
29 changes: 6 additions & 23 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.{CollectionsUtils, Utils}
import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils}

import org.apache.spark.SparkContext.rddToAsyncRDDActions
import scala.concurrent.Await
import scala.concurrent.duration.Duration

/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
Expand Down Expand Up @@ -117,12 +113,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
private var ordering = implicitly[Ordering[K]]

// An array of upper bounds for the first (partitions - 1) partitions
@volatile private var valRB: Array[K] = null

private def rangeBounds: Array[K] = this.synchronized {
if (valRB != null) return valRB

valRB = if (partitions <= 1) {
private var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
Expand Down Expand Up @@ -160,8 +152,6 @@ class RangePartitioner[K : Ordering : ClassTag, V](
RangePartitioner.determineBounds(candidates, partitions)
}
}

valRB
}

def numPartitions = rangeBounds.length + 1
Expand Down Expand Up @@ -232,8 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}

@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = this.synchronized {
if (valRB != null) return
private def readObject(in: ObjectInputStream) {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => in.defaultReadObject()
Expand All @@ -245,7 +234,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
val ser = sfactory.newInstance()
Utils.deserializeViaNestedStream(in, ser) { ds =>
implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
valRB = ds.readObject[Array[K]]()
rangeBounds = ds.readObject[Array[K]]()
}
}
}
Expand All @@ -265,18 +254,12 @@ private[spark] object RangePartitioner {
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
// use collectAsync here to run this job as a future, which is cancellable
val sketchFuture = rdd.mapPartitionsWithIndex { (idx, iter) =>
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collectAsync()
// We do need the future's value to continue any further
val sketched = Await.ready(sketchFuture, Duration.Inf).value.get match {
case scala.util.Success(v) => v.toArray
case scala.util.Failure(e) => throw e
}
}.collect()
val numItems = sketched.map(_._2.toLong).sum
(numItems, sketched)
}
Expand Down
64 changes: 26 additions & 38 deletions core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.reflect.ClassTag

import org.apache.spark.util.Utils
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
import org.apache.spark.annotation.Experimental

Expand All @@ -39,30 +38,29 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
* Returns a future for counting the number of elements in the RDD.
*/
def countAsync(): FutureAction[Long] = {
val f = new ComplexFutureAction[Long]
f.run {
val totalCount = new AtomicLong
f.runJob(self,
(iter: Iterator[T]) => Utils.getIteratorSize(iter),
Range(0, self.partitions.size),
(index: Int, data: Long) => totalCount.addAndGet(data),
totalCount.get())
}
val totalCount = new AtomicLong
self.context.submitJob(
self,
(iter: Iterator[T]) => {
var result = 0L
while (iter.hasNext) {
result += 1L
iter.next()
}
result
},
Range(0, self.partitions.size),
(index: Int, data: Long) => totalCount.addAndGet(data),
totalCount.get())
}

/**
* Returns a future for retrieving all elements of this RDD.
*/
def collectAsync(): FutureAction[Seq[T]] = {
val f = new ComplexFutureAction[Seq[T]]
f.run {
val results = new Array[Array[T]](self.partitions.size)
f.runJob(self,
(iter: Iterator[T]) => iter.toArray,
Range(0, self.partitions.size),
(index: Int, data: Array[T]) => results(index) = data,
results.flatten.toSeq)
}
val results = new Array[Array[T]](self.partitions.size)
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size),
(index, data) => results(index) = data, results.flatten.toSeq)
}

/**
Expand Down Expand Up @@ -106,34 +104,24 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
}
results.toSeq
}

f
}

/**
* Applies a function f to all elements of this RDD.
*/
def foreachAsync(expr: T => Unit): FutureAction[Unit] = {
val f = new ComplexFutureAction[Unit]
val exprClean = self.context.clean(expr)
f.run {
f.runJob(self,
(iter: Iterator[T]) => iter.foreach(exprClean),
Range(0, self.partitions.size),
(index: Int, data: Unit) => Unit,
Unit)
}
def foreachAsync(f: T => Unit): FutureAction[Unit] = {
val cleanF = self.context.clean(f)
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
(index, data) => Unit, Unit)
}

/**
* Applies a function f to each partition of this RDD.
*/
def foreachPartitionAsync(expr: Iterator[T] => Unit): FutureAction[Unit] = {
val f = new ComplexFutureAction[Unit]
f.run {
f.runJob(self,
expr,
Range(0, self.partitions.size),
(index: Int, data: Unit) => Unit,
Unit)
}
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.size),
(index, data) => Unit, Unit)
}
}
Empty file added sql/core/runTests
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"udf_when",
"udf_xpath",
"udf_xpath_boolean",
"udf_xpath_double",
"udf_xpath_float",
"udf_xpath_int",
"udf_xpath_long",
Expand Down