From 4245cee69a11c13ca807f9de30822ded9af3c0f6 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Mon, 21 Dec 2015 10:06:29 -0600 Subject: [PATCH 1/2] SPARK-12450 Un-persist broadcasted variables in KMeans --- .../main/scala/org/apache/spark/mllib/clustering/KMeans.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 11633e8242313..a85c9e1f3d314 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -213,6 +213,8 @@ class KMeans private ( contribs.iterator }.reduceByKey(mergeContribs).collectAsMap() + bcActiveCenters.unpersist(blocking = false) + // Update the cluster centers and costs for each active run for ((run, i) <- activeRuns.zipWithIndex) { var changed = false @@ -324,6 +326,7 @@ class KMeans private ( s0 } ) + bcNewCenters.unpersist(blocking = false) preCosts.unpersist(blocking = false) val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) => val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) @@ -353,6 +356,7 @@ class KMeans private ( ((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0) } }.reduceByKey(_ + _).collectAsMap() + bcCenters.unpersist(blocking = false) val finalCenters = (0 until runs).par.map { r => val myCenters = centers(r).toArray val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray From 974ef4ea046a90390e966e5cc3a3d6cc311a5cc7 Mon Sep 17 00:00:00 2001 From: RJ Nowling Date: Mon, 4 Jan 2016 21:43:05 -0600 Subject: [PATCH 2/2] Add blank lines to fix style issues --- .../main/scala/org/apache/spark/mllib/clustering/KMeans.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index a85c9e1f3d314..9096f03287a73 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -326,8 +326,10 @@ class KMeans private ( s0 } ) + bcNewCenters.unpersist(blocking = false) preCosts.unpersist(blocking = false) + val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) => val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) pointsWithCosts.flatMap { case (p, c) => @@ -356,7 +358,9 @@ class KMeans private ( ((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0) } }.reduceByKey(_ + _).collectAsMap() + bcCenters.unpersist(blocking = false) + val finalCenters = (0 until runs).par.map { r => val myCenters = centers(r).toArray val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray