From 81065aed9987c1b08cd5784b7a6153e26f3f7402 Mon Sep 17 00:00:00 2001 From: Ignacio Zendejas Date: Thu, 10 Apr 2014 13:43:07 -0700 Subject: [PATCH 1/2] Some minor optimizations to hopefully initiate my first PR and familiarize myself with the process, mostly. * got rid of SeqLike.reverse calls when sorting by descending order * replaced slice(1, length) with safer and more readable tail calls * used foldLeft when aggregating num of docs in naive bayes code --- core/src/main/scala/org/apache/spark/Partitioner.scala | 2 +- .../scala/org/apache/spark/deploy/master/Master.scala | 2 +- .../org/apache/spark/deploy/master/ui/IndexPage.scala | 8 ++++---- .../org/apache/spark/deploy/worker/ui/IndexPage.scala | 4 ++-- .../main/scala/org/apache/spark/ui/jobs/IndexPage.scala | 9 +++++---- .../main/scala/org/apache/spark/ui/jobs/PoolPage.scala | 4 +++- .../test/scala/org/apache/spark/rdd/SortingSuite.scala | 2 +- .../spark/streaming/examples/TwitterAlgebirdCMS.scala | 6 +++--- .../apache/spark/mllib/api/python/PythonMLLibAPI.scala | 4 ++-- .../apache/spark/mllib/classification/NaiveBayes.scala | 5 +---- .../mllib/regression/GeneralizedLinearAlgorithm.scala | 2 +- .../scala/org/apache/spark/mllib/tree/DecisionTree.scala | 2 +- .../org/apache/spark/mllib/util/MFDataGenerator.scala | 2 +- .../spark/sql/catalyst/plans/physical/partitioning.scala | 2 +- 14 files changed, 27 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index ad9988226470..7a821e0fccdb 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -49,7 +49,7 @@ object Partitioner { * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { - val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse + val bySize = (Seq(rdd) ++ others).sortBy(-_.partitions.size) for (r <- bySize if r.partitioner.isDefined) { return r.partitioner.get } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 95bd62e88db2..04f2e33d9b3f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -481,7 +481,7 @@ private[spark] class Master( // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(canUse(app, _)).sortBy(_.coresFree).reverse + .filter(canUse(app, _)).sortBy(-_.coresFree) val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 8c1d6c7cce45..21b975db4bd6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -52,16 +52,16 @@ private[spark] class IndexPage(parent: MasterWebUI) { val appHeaders = Seq("ID", "Name", "Cores", "Memory per Node", "Submitted Time", "User", "State", "Duration") - val activeApps = state.activeApps.sortBy(_.startTime).reverse + val activeApps = state.activeApps.sortBy(-_.startTime) val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps) - val completedApps = state.completedApps.sortBy(_.endTime).reverse + val completedApps = state.completedApps.sortBy(-_.endTime) val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps) val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory", "Main Class") - val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse + val activeDrivers = state.activeDrivers.sortBy(-_.startTime) val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers) - val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse + val completedDrivers = state.completedDrivers.sortBy(-_.startTime) val completedDriversTable = UIUtils.listingTable(driverHeaders, driverRow, completedDrivers) // For now we only show driver information if the user has submitted drivers to the cluster. diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index 85200ab0e102..ac40a95c9704 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -53,9 +53,9 @@ private[spark] class IndexPage(parent: WorkerWebUI) { UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors) val driverHeaders = Seq("DriverID", "Main Class", "State", "Cores", "Memory", "Logs", "Notes") - val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse + val runningDrivers = workerState.drivers.sortWith(_.driverId > _.driverId) val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers) - val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse + val finishedDrivers = workerState.finishedDrivers.sortWith(_.driverId > _.driverId) def finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers) // For now we only show driver information if the user has submitted drivers to the cluster. diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 70d62b66a482..594103ed29da 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.{Node, NodeSeq} -import org.apache.spark.scheduler.Schedulable +import org.apache.spark.scheduler.{StageInfo, Schedulable} import org.apache.spark.ui.Page._ import org.apache.spark.ui.UIUtils @@ -41,10 +41,11 @@ private[ui] class IndexPage(parent: JobProgressUI) { val failedStages = listener.failedStages.reverse.toSeq val now = System.currentTimeMillis() - val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) + val mostRecentlySubmitted = (si: StageInfo) => -si.submissionTime.getOrElse(0L) + val activeStagesTable = new StageTable(activeStages.sortBy(mostRecentlySubmitted), parent) val completedStagesTable = - new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent) - val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent) + new StageTable(completedStages.sortBy(mostRecentlySubmitted), parent) + val failedStagesTable = new StageTable(failedStages.sortBy(mostRecentlySubmitted), parent) // For now, pool information is only accessible in live UIs val pools = if (live) sc.getAllPools else Seq[Schedulable]() diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index bd33182b7005..1bb8b00abbd9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -41,7 +41,9 @@ private[ui] class PoolPage(parent: JobProgressUI) { case Some(s) => s.values.toSeq case None => Seq[StageInfo]() } - val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) + + val mostRecentlySubmitted = (si: StageInfo) => -si.submissionTime.getOrElse(0L) + val activeStagesTable = new StageTable(activeStages.sortBy(mostRecentlySubmitted), parent) // For now, pool information is only accessible in live UIs val pools = if (live) Seq(sc.getPoolForName(poolName).get) else Seq[Schedulable]() diff --git a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala index d0619559bb45..487cfdcb80b8 100644 --- a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala @@ -109,7 +109,7 @@ class SortingSuite extends FunSuite with SharedSparkContext with ShouldMatchers test("partition balancing for descending sort") { val pairArr = (1 to 1000).map(x => (x, x)).toArray val sorted = sc.parallelize(pairArr, 4).sortByKey(false) - assert(sorted.collect() === pairArr.sortBy(_._1).reverse) + assert(sorted.collect() === pairArr.sortBy(-_._1)) val partitions = sorted.collectPartitions() logInfo("partition lengths: " + partitions.map(_.length).mkString(", ")) val lengthArr = partitions.map(_.length) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index 8a654f8fada2..7522eb9e0116 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -89,10 +89,10 @@ object TwitterAlgebirdCMS { if (rdd.count() != 0) { val partial = rdd.first() val partialTopK = partial.heavyHitters.map(id => - (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + (id, partial.frequency(id).estimate)).toSeq.sortBy(-_._2).take(TOPK) globalCMS ++= partial val globalTopK = globalCMS.heavyHitters.map(id => - (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(-_._2).take(TOPK) println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC, partialTopK.mkString("[", ",", "]"))) println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC, @@ -107,7 +107,7 @@ object TwitterAlgebirdCMS { {case (id, count) => (count, id)}) .sortByKey(ascending = false).take(TOPK) globalExact = mm.plus(globalExact.toMap, partialMap) - val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK) + val globalTopK = globalExact.toSeq.sortBy(-_._2).take(TOPK) println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]"))) println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]"))) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 2df5b0d02b69..73cb6cc37110 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -114,7 +114,7 @@ class PythonMLLibAPI extends Serializable { initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = { val data = dataBytesJRDD.rdd.map(xBytes => { val x = deserializeDoubleVector(xBytes) - LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length))) + LabeledPoint(x(0), Vectors.dense(x.tail)) }) val initialWeights = deserializeDoubleVector(initialWeightsBA) val model = trainFunc(data, initialWeights) @@ -243,7 +243,7 @@ class PythonMLLibAPI extends Serializable { lambda: Double): java.util.List[java.lang.Object] = { val data = dataBytesJRDD.rdd.map(xBytes => { val x = deserializeDoubleVector(xBytes) - LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length))) + LabeledPoint(x(0), Vectors.dense(x.tail)) }) val model = NaiveBayes.train(data, lambda) val ret = new java.util.LinkedList[java.lang.Object]() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index e956185319a6..57b58a2dd0f1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -91,10 +91,7 @@ class NaiveBayes private (var lambda: Double) extends Serializable with Logging (c1._1 + c2._1, c1._2 += c2._2) ).collect() val numLabels = aggregated.length - var numDocuments = 0L - aggregated.foreach { case (_, (n, _)) => - numDocuments += n - } + val numDocuments = aggregated.foldLeft(0L){case (curCount, (_, (n, _))) => curCount + n} val numFeatures = aggregated.head match { case (_, (_, v)) => v.size } val labels = new Array[Double](numLabels) val pi = new Array[Double](numLabels) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index 80dc0f12ff84..41d9f776a9f6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -156,7 +156,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] val intercept = if (addIntercept) weightsWithIntercept(0) else 0.0 val weights = if (addIntercept) { - Vectors.dense(weightsWithIntercept.toArray.slice(1, weightsWithIntercept.size)) + Vectors.dense(weightsWithIntercept.toArray.tail) } else { weightsWithIntercept } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala index dee9594a9dd7..dbdad0a7a14e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala @@ -1117,7 +1117,7 @@ object DecisionTree extends Serializable with Logging { sc.textFile(dir).map { line => val parts = line.trim().split(",") val label = parts(0).toDouble - val features = Vectors.dense(parts.slice(1,parts.length).map(_.toDouble)) + val features = Vectors.dense(parts.tail.map(_.toDouble)) LabeledPoint(label, features) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala index 348aba1dea5b..9c3c743d06c1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala @@ -84,7 +84,7 @@ object MFDataGenerator{ val mn = m * n val shuffled = rand.shuffle(1 to mn toList) - val omega = shuffled.slice(0, sampSize) + val omega = shuffled.take(sampSize) val ordered = omega.sortWith(_ < _).toArray val trainData: RDD[(Int, Int, Double)] = sc.parallelize(ordered) .map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1))) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index ffb3a92f8f34..e41f54be97a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -190,7 +190,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) override def satisfies(required: Distribution): Boolean = required match { case UnspecifiedDistribution => true case OrderedDistribution(requiredOrdering) => - val minSize = Seq(requiredOrdering.size, ordering.size).min + val minSize = requiredOrdering.size.min(ordering.size) requiredOrdering.take(minSize) == ordering.take(minSize) case ClusteredDistribution(requiredClustering) => clusteringSet.subsetOf(requiredClustering.toSet) From 36888799b95282a5d4bbb5c4d745b2118ca96fa7 Mon Sep 17 00:00:00 2001 From: Ignacio Zendejas Date: Mon, 21 Apr 2014 17:11:37 -0700 Subject: [PATCH 2/2] reverted changes per feedback prior to quick pr --- core/src/main/scala/org/apache/spark/Partitioner.scala | 2 +- .../scala/org/apache/spark/deploy/master/Master.scala | 2 +- .../org/apache/spark/deploy/master/ui/IndexPage.scala | 8 ++++---- .../org/apache/spark/deploy/worker/ui/IndexPage.scala | 4 ++-- .../test/scala/org/apache/spark/rdd/SortingSuite.scala | 2 +- .../spark/streaming/examples/TwitterAlgebirdCMS.scala | 6 +++--- .../apache/spark/mllib/classification/NaiveBayes.scala | 5 ++++- .../spark/sql/catalyst/plans/physical/partitioning.scala | 2 +- 8 files changed, 17 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 7a821e0fccdb..ad9988226470 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -49,7 +49,7 @@ object Partitioner { * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { - val bySize = (Seq(rdd) ++ others).sortBy(-_.partitions.size) + val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse for (r <- bySize if r.partitioner.isDefined) { return r.partitioner.get } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 04f2e33d9b3f..95bd62e88db2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -481,7 +481,7 @@ private[spark] class Master( // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(canUse(app, _)).sortBy(-_.coresFree) + .filter(canUse(app, _)).sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 21b975db4bd6..8c1d6c7cce45 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -52,16 +52,16 @@ private[spark] class IndexPage(parent: MasterWebUI) { val appHeaders = Seq("ID", "Name", "Cores", "Memory per Node", "Submitted Time", "User", "State", "Duration") - val activeApps = state.activeApps.sortBy(-_.startTime) + val activeApps = state.activeApps.sortBy(_.startTime).reverse val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps) - val completedApps = state.completedApps.sortBy(-_.endTime) + val completedApps = state.completedApps.sortBy(_.endTime).reverse val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps) val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory", "Main Class") - val activeDrivers = state.activeDrivers.sortBy(-_.startTime) + val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers) - val completedDrivers = state.completedDrivers.sortBy(-_.startTime) + val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse val completedDriversTable = UIUtils.listingTable(driverHeaders, driverRow, completedDrivers) // For now we only show driver information if the user has submitted drivers to the cluster. diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index ac40a95c9704..85200ab0e102 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -53,9 +53,9 @@ private[spark] class IndexPage(parent: WorkerWebUI) { UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors) val driverHeaders = Seq("DriverID", "Main Class", "State", "Cores", "Memory", "Logs", "Notes") - val runningDrivers = workerState.drivers.sortWith(_.driverId > _.driverId) + val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers) - val finishedDrivers = workerState.finishedDrivers.sortWith(_.driverId > _.driverId) + val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse def finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers) // For now we only show driver information if the user has submitted drivers to the cluster. diff --git a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala index 487cfdcb80b8..d0619559bb45 100644 --- a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala @@ -109,7 +109,7 @@ class SortingSuite extends FunSuite with SharedSparkContext with ShouldMatchers test("partition balancing for descending sort") { val pairArr = (1 to 1000).map(x => (x, x)).toArray val sorted = sc.parallelize(pairArr, 4).sortByKey(false) - assert(sorted.collect() === pairArr.sortBy(-_._1)) + assert(sorted.collect() === pairArr.sortBy(_._1).reverse) val partitions = sorted.collectPartitions() logInfo("partition lengths: " + partitions.map(_.length).mkString(", ")) val lengthArr = partitions.map(_.length) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index 7522eb9e0116..6729d822d4a1 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -89,10 +89,10 @@ object TwitterAlgebirdCMS { if (rdd.count() != 0) { val partial = rdd.first() val partialTopK = partial.heavyHitters.map(id => - (id, partial.frequency(id).estimate)).toSeq.sortBy(-_._2).take(TOPK) + (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.take(TOPK) globalCMS ++= partial val globalTopK = globalCMS.heavyHitters.map(id => - (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(-_._2).take(TOPK) + (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.take(TOPK) println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC, partialTopK.mkString("[", ",", "]"))) println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC, @@ -107,7 +107,7 @@ object TwitterAlgebirdCMS { {case (id, count) => (count, id)}) .sortByKey(ascending = false).take(TOPK) globalExact = mm.plus(globalExact.toMap, partialMap) - val globalTopK = globalExact.toSeq.sortBy(-_._2).take(TOPK) + val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.take(TOPK) println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]"))) println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]"))) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 57b58a2dd0f1..e956185319a6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -91,7 +91,10 @@ class NaiveBayes private (var lambda: Double) extends Serializable with Logging (c1._1 + c2._1, c1._2 += c2._2) ).collect() val numLabels = aggregated.length - val numDocuments = aggregated.foldLeft(0L){case (curCount, (_, (n, _))) => curCount + n} + var numDocuments = 0L + aggregated.foreach { case (_, (n, _)) => + numDocuments += n + } val numFeatures = aggregated.head match { case (_, (_, v)) => v.size } val labels = new Array[Double](numLabels) val pi = new Array[Double](numLabels) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index e41f54be97a7..de8a4aff111e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -190,7 +190,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) override def satisfies(required: Distribution): Boolean = required match { case UnspecifiedDistribution => true case OrderedDistribution(requiredOrdering) => - val minSize = requiredOrdering.size.min(ordering.size) + val minSize = math.min(requiredOrdering.size, ordering.size) requiredOrdering.take(minSize) == ordering.take(minSize) case ClusteredDistribution(requiredClustering) => clusteringSet.subsetOf(requiredClustering.toSet)