diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index c2ebf3059621..8e169e78fc1d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration
import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{ Failure, Success }
import org.apache.spark.rpc._
import org.apache.spark._
@@ -33,7 +33,21 @@ import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
+import org.apache.spark.util.{ ThreadUtils, SignalLogger, Utils }
+
+// han sampler import begin
+import java.io._
+import org.apache.spark.storage._
+import java.lang.System
+import java.util.concurrent._
+import java.lang.management._
+import java.util.List
+import java.util.Date
+import scala.collection.JavaConversions._
+import java.text._
+import scala.util.Properties
+import scala.sys.process._
+// han sampler import end
private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
@@ -44,7 +58,6 @@ private[spark] class CoarseGrainedExecutorBackend(
userClassPath: Seq[URL],
env: SparkEnv)
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
-
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None
@@ -62,7 +75,7 @@ private[spark] class CoarseGrainedExecutorBackend(
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) => Utils.tryLogNonFatalError {
- Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
+ Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutorResponse
}
case Failure(e) => {
logError(s"Cannot register with driver: $driverUrl", e)
@@ -80,6 +93,7 @@ private[spark] class CoarseGrainedExecutorBackend(
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor(hostname) =>
logInfo("Successfully registered with driver")
+// val (hostname, _) = Utils.parseHostPort(hostPort)
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
case RegisterExecutorFailed(message) =>
@@ -111,7 +125,7 @@ private[spark] class CoarseGrainedExecutorBackend(
// a message to self to actually do the shutdown.
self.send(Shutdown)
- case Shutdown =>
+ case Shutdown =>
executor.stop()
stop()
rpcEnv.shutdown()
@@ -138,13 +152,13 @@ private[spark] class CoarseGrainedExecutorBackend(
private[spark] object CoarseGrainedExecutorBackend extends Logging {
private def run(
- driverUrl: String,
- executorId: String,
- hostname: String,
- cores: Int,
- appId: String,
- workerUrl: Option[String],
- userClassPath: Seq[URL]) {
+ driverUrl: String,
+ executorId: String,
+ hostname: String,
+ cores: Int,
+ appId: String,
+ workerUrl: Option[String],
+ userClassPath: Seq[URL]) {
SignalLogger.register(log)
@@ -161,7 +175,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
port,
executorConf,
new SecurityManager(executorConf),
- clientMode = true)
+ clientMode=true)
val driver = fetcher.setupEndpointRefByURI(driverUrl)
val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
Seq[(String, String)](("spark.app.id", appId))
@@ -202,7 +216,50 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
}
+ def getProcessUsedMemoryAndCPU(processID: String): (Double, Double) = {
+ var memory = 0d;
+ var CPU = 0d;
+ val NUMCPU = 8;
+ var process = ""
+/* val commands = new java.util.Vector[String]()
+ commands.add("/bin/bash")
+ commands.add("-c")
+ commands.add("ps aux | grep " + processID)
+ val pb=new java.lang.ProcessBuilder(commands)
+ val pr=pb.start()
+ pr.waitFor()
+ if (pr.exitValue()==0) {
+ val outReader=new java.io.BufferedReader(new java.io.InputStreamReader(pr.getInputStream()));
+ var source = ""
+ source = outReader.readLine()
+ while(source != null) {
+ try {
+ val tokens = source.split(" +")
+ if(tokens(1).equals(processID)) {
+ memory = 1024L*tokens(5).toLong
+ CPU = tokens(2).toDouble / NUMCPU
+ }
+ } catch { case e: Exception => () }
+ finally {
+ source = outReader.readLine()
+ }
+ }
+ }
+*/
+
+ val topout = Seq("/bin/sh", "-c", "top -n 1 -b -p " + processID + " | tail -1").!!.trim.split(" +")
+ val len = topout(5).length
+ if(topout(5).endsWith("g")) { memory = 1024L*1024L*1024L*topout(5).take(len-1).toDouble }
+ else if(topout(5).endsWith("m")) { memory = 1024L*1024L*topout(5).take(len-1).toDouble }
+ else if(topout(5).endsWith("k")) { memory = 1024L*topout(5).take(len-1).toDouble }
+ else { memory = topout(5).toDouble }
+ CPU = topout(8).toDouble / NUMCPU
+
+ return (memory, CPU)
+ }
+
def main(args: Array[String]) {
+
var driverUrl: String = null
var executorId: String = null
var hostname: String = null
@@ -250,7 +307,186 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
printUsageAndExit()
}
+ // han sampler 1 begin
+ val SAMPLING_PERIOD: Long = 10
+ val JMAP_PERIOD: Long = 5000
+ val TIMESTAMP_PERIOD: Long = 1000
+ var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss")
+
+ val dirname_executor = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + appId + "/" + executorId
+ val dir_executor = new File(dirname_executor)
+ if (!dir_executor.exists())
+ dir_executor.mkdirs()
+ val dirname_histo = dirname_executor + "/histo"
+ val dir_histo = new File(dirname_histo)
+ if (!dir_histo.exists())
+ dir_histo.mkdirs()
+
+ // val writer = new FileWriter(new File("/home/ubuntu/sparkOutput/sparkOutput_executor_" + System.nanoTime() + ".txt"), true)
+ val writer = new FileWriter(new File(dirname_executor + "/" + "sparkOutput_worker_" + appId + "_" + executorId + ".txt"), true)
+ writer.write(appId + "_" + executorId + "\n")
+ writer.flush()
+
+ var osBean: com.sun.management.OperatingSystemMXBean = ManagementFactory.getPlatformMXBean(classOf[com.sun.management.OperatingSystemMXBean])
+ var availableProcessors: Int = osBean.getAvailableProcessors()
+ logInfo("Number of available processors for executor " + executorId + ": " + availableProcessors)
+ var avgUsedCPU: Double = 0
+ var numberOfCPUSamples: Long = 1
+ var memBean: MemoryMXBean = ManagementFactory.getMemoryMXBean()
+ var rtBean: RuntimeMXBean = ManagementFactory.getRuntimeMXBean()
+ var mpBeans: List[MemoryPoolMXBean] = ManagementFactory.getMemoryPoolMXBeans()
+ var s: String = ""
+ mpBeans.foreach {
+
+ b =>
+ s += b.getName() + "\t";
+ }
+ s += "Used heap\tCommitted heap\tMax heap\tUsed nonheap\tCommitted nonheap\tMax nonheap\tTotal memory\tUsed CPU"
+ writer.write(s + "\n")
+ writer.flush()
+
+ var mBeans: List[MemoryPoolMXBean] = null
+ var prevUpTime: Double = 0
+ var prevProcessCPUTime: Double = 0
+ var upTime: Double = 0
+ var processCPUTime: Double = 0
+
+ var processID: String = ""
+ var datanodePID: String = ""
+ var nodemanagerPID: String = ""
+
+/* val commands = new java.util.Vector[String]()
+ commands.add("/bin/bash")
+ commands.add("-c")
+ commands.add("echo $PPID")
+ val pb=new java.lang.ProcessBuilder(commands)
+ val pr=pb.start()
+ pr.waitFor()
+ if (pr.exitValue()==0) {
+ val outReader=new java.io.BufferedReader(new java.io.InputStreamReader(pr.getInputStream()));
+ processID = outReader.readLine().trim()
+ println("Found process ID: " + processID)
+ } else {
+ println("Error while getting PID")
+ }
+*/
+ val pname = ManagementFactory.getRuntimeMXBean().getName()
+ processID = pname.substring(0, pname.indexOf('@'))
+
+ val jps = new java.util.Vector[String]()
+ jps.add("/bin/bash")
+ jps.add("-c")
+ jps.add("jps")
+ val pbi=new java.lang.ProcessBuilder(jps)
+ val pri = pbi.start()
+ pri.waitFor()
+ if (pri.exitValue()==0) {
+ val outReader=new java.io.BufferedReader(new java.io.InputStreamReader(pri.getInputStream()));
+ var source = ""
+ source = outReader.readLine()
+ while(source != null) {
+ try {
+ val tokens = source.split(" +")
+ if(tokens(1).equals("DataNode")) {
+ datanodePID = tokens(0)
+ //println("Found datanode PID: " + datanodePID)
+ }
+ if(tokens(1).equals("NodeManager")) {
+ nodemanagerPID = tokens(0)
+ //println("Found nodemanager pid: " + nodemanagerPID)
+ }
+ } catch { case e: Exception => () }
+ finally {
+ source = outReader.readLine()
+ }
+ }
+ } else {
+ println("Error while getting jps output")
+ }
+
+
+ val ex = new ScheduledThreadPoolExecutor(1)
+ ex.setRemoveOnCancelPolicy(true)
+ val task = new Runnable {
+ var i: Long = 0
+ override def run {
+
+ s = ""
+ mpBeans.foreach {
+ b =>
+ s += b.getUsage().getUsed() + "\t";
+ }
+ s += memBean.getHeapMemoryUsage().getUsed() + "\t"
+ s += memBean.getHeapMemoryUsage().getCommitted() + "\t"
+ s += memBean.getHeapMemoryUsage().getMax() + "\t"
+ s += memBean.getNonHeapMemoryUsage().getUsed() + "\t"
+ s += memBean.getNonHeapMemoryUsage().getCommitted() + "\t"
+ s += memBean.getNonHeapMemoryUsage().getMax() + "\t"
+ // get used memory from jmap
+ // val jmapout = Seq("/bin/sh", "-c", "jmap -histo " + processID + " | tail -1").!!.trim
+ // s += jmapout.split(" +")(2)
+
+ // record off heap memory usage
+ s += org.apache.spark.unsafe.Platform.TOTAL_BYTES;
+
+ upTime = rtBean.getUptime() * 10000
+ processCPUTime = osBean.getProcessCpuTime()
+ var elapsedCPU: Double = processCPUTime - prevProcessCPUTime
+ var elapsedTime: Double = upTime - prevUpTime
+ var usedCPU: Double = 0
+ if (elapsedTime > 0.0) {
+ usedCPU = math.min(99.0, elapsedCPU / (elapsedTime * availableProcessors))
+ avgUsedCPU += usedCPU
+ numberOfCPUSamples += 1
+ }
+ s += "\t" + usedCPU.toString()
+ prevUpTime = upTime
+ prevProcessCPUTime = processCPUTime
+
+ var res: (Double, Double) = (0, 0);
+ try { res = getProcessUsedMemoryAndCPU(processID); s += "\t" + res._1 + "\t" + res._2 } catch{ case e:Exception => e.printStackTrace() }
+
+ try { res = getProcessUsedMemoryAndCPU(datanodePID); s += "\t" + res._1 + "\t" + res._2 } catch{ case e:Exception => e.printStackTrace() }
+ try { res = getProcessUsedMemoryAndCPU(nodemanagerPID); s += "\t" + res._1 + "\t" + res._2 } catch{ case e:Exception => e.printStackTrace() }
+
+ if (i % TIMESTAMP_PERIOD == 0) {
+ var time: String = dateFormat.format(new Date())
+ s += "\t" + time
+ }
+
+
+ if (i % JMAP_PERIOD == 0) {
+ var time: String = dateFormat.format(new Date())
+ val pname = ManagementFactory.getRuntimeMXBean().getName()
+ val pid = pname.substring(0, pname.indexOf('@'))
+ val command = "jmap -histo " + pid
+ val result = command.!!
+ val writer1 = new FileWriter(new File(dirname_histo + "/" + "sparkOutput_worker_" + appId + "_" + executorId + "_" + time + ".txt"), true)
+ writer1.write(result)
+ writer1.flush()
+ writer1.close()
+ }
+
+
+ i = i + SAMPLING_PERIOD
+ writer.write(s + "\n")
+ writer.flush()
+
+ }
+ }
+ val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS)
+ // han sampler 1 end
+
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
+
+ // han sampler 2 begin
+ avgUsedCPU /= numberOfCPUSamples
+ logInfo("Average used CPU of executor " + executorId + ": " + avgUsedCPU)
+
+ f.cancel(true)
+ writer.flush()
+ writer.close()
+ // han sampler 2 end
}
private def printUsageAndExit() = {
@@ -273,3 +509,4 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
}
+
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index e707e27d96b5..a5b9f142f1e7 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -182,6 +182,10 @@ private[spark] abstract class MemoryManager(
* sun.misc.Unsafe.
*/
final val tungstenMemoryMode: MemoryMode = {
+//println("-- spark.memory.offHeap.enabled" + conf.getBoolean("spark.memory.offHeap.enabled", false));
+//println("-- spark.memory.offHeap.size" + conf.getSizeAsBytes("spark.memory.offHeap.size"));
+//MemoryMode.OFF_HEAP
+
if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
@@ -189,6 +193,7 @@ private[spark] abstract class MemoryManager(
} else {
MemoryMode.ON_HEAP
}
+
}
/**
diff --git a/examples/pom.xml b/examples/pom.xml
index f5ab2a7fdc09..d6aa6af5153a 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -41,6 +41,11 @@
${project.version}
provided
+
+ org.apache.mahout
+ mahout-core
+ 0.9
+
org.apache.spark
spark-streaming_${scala.binary.version}
diff --git a/examples/src/main/scala/org/apache/spark/examples/Sort.scala b/examples/src/main/scala/org/apache/spark/examples/Sort.scala
new file mode 100644
index 000000000000..352e69e563a2
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/Sort.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples
+
+import scala.Tuple2
+import org.apache.spark.SparkConf
+
+import org.apache.spark._
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+// han sampler import begin
+import java.io._
+import org.apache.spark.storage._
+import java.lang.System
+import java.util.Date
+import java.util.concurrent._
+import java.text._
+import scala.util.Properties
+// han sampler import end
+
+object Sort {
+
+ val SPACE = Pattern.compile(" ")
+
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ println("Usage: WordCount ");
+ System.exit(1);
+ }
+
+ val conf = new SparkConf().setAppName("Sort")
+
+ // han register kyro classes begin
+
+// val tuple3ArrayClass = classOf[Array[Tuple3[Any, Any, Any]]]
+// val anonClass = Class.forName("scala.reflect.ClassTag$$anon$1")
+// val javaClassClass = classOf[java.lang.Class[Any]]
+// val StringClass = classOf[java.lang.String]
+// val StringArrayClass = classOf[Array[java.lang.String]]
+
+// conf.registerKryoClasses(Array(tuple3ArrayClass, anonClass, javaClassClass, StringClass, StringArrayClass))
+
+ // han register kyro classes end
+
+ val sc = new SparkContext(conf)
+
+ // han sampler 1 begin
+ val SAMPLING_PERIOD: Long = 10
+ val TIMESTAMP_PERIOD: Long = 1000
+
+ var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss")
+
+ val dirname_application = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + sc.applicationId
+ val dir_application = new File(dirname_application)
+ if (!dir_application.exists())
+ dir_application.mkdirs()
+
+ val ex = new ScheduledThreadPoolExecutor(1)
+ val task = new Runnable {
+ var i: Long = 0
+ override def run {
+
+ // sc.getExecutorStorageStatus.filter(s => s.blockManagerId.host.contains("slave1"))
+ sc.getExecutorStorageStatus.foreach {
+ es =>
+ val filename: String = dirname_application + "/sparkOutput_driver_" + sc.applicationId + "_" + es.blockManagerId + ".txt"
+ val file = new File(filename)
+ val writer = new FileWriter(file, true)
+ if (!file.exists()) {
+ file.createNewFile()
+ writer.write(sc.applicationId + "_" + es.blockManagerId + "\n")
+ writer.flush()
+ //writer.close()
+ }
+
+ var s = es.memUsed.toString()
+ //println(s)
+ if (i % TIMESTAMP_PERIOD == 0) {
+ i = 0
+ var time: String = dateFormat.format(new Date())
+ s += "\t" + time
+ }
+
+ writer.write(s + "\n")
+ writer.flush()
+ writer.close()
+ }
+ i = i + SAMPLING_PERIOD
+ }
+ }
+ val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS)
+ // han sampler 1 end
+
+ val lines = sc.textFile(args(0), 1)
+ //val parallel = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism)/2
+ val data = lines.map((_, 1))
+ //val partitioner = new HashPartitioner(partitions = parallel)
+ val sorted = data.sortByKey().map(_._1)
+
+ //val output = counts.collect()
+ //output.foreach(t => println(t._1 + ": " + t._2))
+ sorted.saveAsTextFile(args(1))
+
+ sc.stop()
+
+ // han sampler 2 begin
+ f.cancel(true)
+ // hand sampler 2 end
+ sys.exit(0)
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/SortDF.scala b/examples/src/main/scala/org/apache/spark/examples/SortDF.scala
new file mode 100644
index 000000000000..98c96e43c1df
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/SortDF.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples
+
+import scala.Tuple2
+import org.apache.spark.SparkConf
+
+import org.apache.spark._
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+// han sampler import begin
+import java.io._
+import org.apache.spark.storage._
+import java.lang.System
+import java.util.Date
+import java.util.concurrent._
+import java.text._
+import scala.util.Properties
+// han sampler import end
+
+//case class Counts(word: String, count: Int)
+
+object SortDF {
+
+ val SPACE = Pattern.compile(" ")
+
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ println("Usage: WordCount ");
+ System.exit(1);
+ }
+
+ val conf = new SparkConf().setAppName("Sort DF")
+
+ // han register kyro classes begin
+ /*
+ val tuple3ArrayClass = classOf[Array[Tuple3[Any, Any, Any]]]
+ val anonClass = Class.forName("scala.reflect.ClassTag$$anon$1")
+ val javaClassClass = classOf[java.lang.Class[Any]]
+ val StringClass = classOf[java.lang.String]
+ val StringArrayClass = classOf[Array[java.lang.String]]
+
+ conf.registerKryoClasses(Array(tuple3ArrayClass, anonClass, javaClassClass, StringClass, StringArrayClass))
+ */
+ // han register kyro classes end
+
+ val sc = new SparkContext(conf)
+
+ // han sampler 1 begin
+ val SAMPLING_PERIOD: Long = 10
+ val TIMESTAMP_PERIOD: Long = 1000
+
+ var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss")
+
+ val dirname_application = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + sc.applicationId
+ val dir_application = new File(dirname_application)
+ if (!dir_application.exists())
+ dir_application.mkdirs()
+
+ val ex = new ScheduledThreadPoolExecutor(1)
+ val task = new Runnable {
+ var i: Long = 0
+ override def run {
+
+ // sc.getExecutorStorageStatus.filter(s => s.blockManagerId.host.contains("slave1"))
+ sc.getExecutorStorageStatus.foreach {
+ es =>
+ val filename: String = dirname_application + "/sparkOutput_driver_" + sc.applicationId + "_" + es.blockManagerId + ".txt"
+ val file = new File(filename)
+ val writer = new FileWriter(file, true)
+ if (!file.exists()) {
+ file.createNewFile()
+ writer.write(sc.applicationId + "_" + es.blockManagerId + "\n")
+ writer.flush()
+ //writer.close()
+ }
+
+ var s = es.memUsed.toString()
+ //println(s)
+ if (i % TIMESTAMP_PERIOD == 0) {
+ i = 0
+ var time: String = dateFormat.format(new Date())
+ s += "\t" + time
+ }
+
+ writer.write(s + "\n")
+ writer.flush()
+ writer.close()
+ }
+ i = i + SAMPLING_PERIOD
+ }
+ }
+ val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS)
+ // han sampler 1 end
+
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ import sqlContext.implicits._
+
+ val lines = sc.textFile(args(0), 1)
+ //val parallel = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism)/2
+ val data = lines.map((_, 1)).map(t=>Counts(t._1, t._2)).toDF()
+ data.registerTempTable("data")
+
+ //val partitioner = new HashPartitioner(partitions = parallel)
+ val sorted = data.sort("word").select("word")
+
+ sorted.save(args(1))
+
+ //output.foreach(t => println(t._1 + ": " + t._2))
+
+ sc.stop()
+
+ // han sampler 2 begin
+ f.cancel(true)
+ // hand sampler 2 end
+ sys.exit(0)
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/WordCount.scala b/examples/src/main/scala/org/apache/spark/examples/WordCount.scala
new file mode 100644
index 000000000000..09fd8a03267a
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/WordCount.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples
+
+import scala.Tuple2
+import org.apache.spark.SparkConf
+
+import org.apache.spark._
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+// han sampler import begin
+import java.io._
+import org.apache.spark.storage._
+import java.lang.System
+import java.util.Date
+import java.util.concurrent._
+import java.text._
+import scala.util.Properties
+// han sampler import end
+
+object WordCount {
+
+ val SPACE = Pattern.compile(" ")
+
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ println("Usage: WordCount ");
+ System.exit(1);
+ }
+
+ val conf = new SparkConf().setAppName("WordCount")
+ val sc = new SparkContext(conf)
+
+ // han sampler 1 begin
+ val SAMPLING_PERIOD: Long = 10
+ val TIMESTAMP_PERIOD: Long = 1000
+
+ var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss")
+
+ val dirname_application = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + sc.applicationId
+ val dir_application = new File(dirname_application)
+ if (!dir_application.exists())
+ dir_application.mkdirs()
+
+ val ex = new ScheduledThreadPoolExecutor(1)
+ val task = new Runnable {
+ var i: Long = 0
+ override def run {
+
+ // sc.getExecutorStorageStatus.filter(s => s.blockManagerId.host.contains("slave1"))
+ sc.getExecutorStorageStatus.foreach {
+ es =>
+ val filename: String = dirname_application + "/sparkOutput_driver_" + sc.applicationId + "_" + es.blockManagerId + ".txt"
+ val file = new File(filename)
+ val writer = new FileWriter(file, true)
+ if (!file.exists()) {
+ file.createNewFile()
+ writer.write(sc.applicationId + "_" + es.blockManagerId + "\n")
+ writer.flush()
+ //writer.close()
+ }
+ var s = es.memUsed.toString()
+ //println(s)
+ if (i % TIMESTAMP_PERIOD == 0) {
+ i = 0
+ var time: String = dateFormat.format(new Date())
+ s += "\t" + time
+ }
+
+ writer.write(s + "\n")
+ writer.flush()
+ writer.close()
+ }
+ i = i + SAMPLING_PERIOD
+ }
+ }
+ val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS)
+ // han sampler 1 end
+
+ val lines = sc.textFile(args(0), 1)
+
+ val words = lines.flatMap(l => SPACE.split(l))
+ val ones = words.map(w => (w,1))
+ val counts = ones.reduceByKey(_ + _)
+
+ //val output = counts.collect()
+ //output.foreach(t => println(t._1 + ": " + t._2))
+ counts.saveAsTextFile(args(1))
+
+ sc.stop()
+
+ // han sampler 2 begin
+ f.cancel(true)
+ // hand sampler 2 end
+ sys.exit(0)
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/WordCountDF.scala b/examples/src/main/scala/org/apache/spark/examples/WordCountDF.scala
new file mode 100644
index 000000000000..c252d37d5afa
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/WordCountDF.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples
+
+import scala.Tuple2
+import org.apache.spark.SparkConf
+
+import org.apache.spark._
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+// han sampler import begin
+import java.io._
+import org.apache.spark.storage._
+import java.lang.System
+import java.util.Date
+import java.util.concurrent._
+import java.text._
+import scala.util.Properties
+// han sampler import end
+
+case class Counts(word: String, one: Int)
+
+object WordCountDF {
+
+ val SPACE = Pattern.compile(" ")
+
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ println("Usage: WordCount ");
+ System.exit(1);
+ }
+
+ val conf = new SparkConf().setAppName("WordCount DF")
+ val sc = new SparkContext(conf)
+
+ // han sampler 1 begin
+ val SAMPLING_PERIOD: Long = 10
+ val TIMESTAMP_PERIOD: Long = 1000
+
+ var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss")
+
+ val dirname_application = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + sc.applicationId
+ val dir_application = new File(dirname_application)
+ if (!dir_application.exists())
+ dir_application.mkdirs()
+
+ val ex = new ScheduledThreadPoolExecutor(1)
+ val task = new Runnable {
+ var i: Long = 0
+ override def run {
+
+ // sc.getExecutorStorageStatus.filter(s => s.blockManagerId.host.contains("slave1"))
+ sc.getExecutorStorageStatus.foreach {
+ es =>
+ val filename: String = dirname_application + "/sparkOutput_driver_" + sc.applicationId + "_" + es.blockManagerId + ".txt"
+ val file = new File(filename)
+ val writer = new FileWriter(file, true)
+ if (!file.exists()) {
+ file.createNewFile()
+ writer.write(sc.applicationId + "_" + es.blockManagerId + "\n")
+ writer.flush()
+ //writer.close()
+ }
+ var s = es.memUsed.toString()
+ //println(s)
+ if (i % TIMESTAMP_PERIOD == 0) {
+ i = 0
+ var time: String = dateFormat.format(new Date())
+ s += "\t" + time
+ }
+
+ writer.write(s + "\n")
+ writer.flush()
+ writer.close()
+ }
+ i = i + SAMPLING_PERIOD
+ }
+ }
+ val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS)
+ // han sampler 1 end
+
+ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ import sqlContext.implicits._
+
+ val lines = sc.textFile(args(0), 1)
+
+ val words = lines.flatMap(l => SPACE.split(l))
+ val ones = words.map(w => (w,1)).map(t => Counts(t._1, t._2)).toDF()
+ ones.registerTempTable("ones")
+
+ val counts = sqlContext.sql("SELECT word, COUNT(*) FROM ones GROUP BY word")
+
+ val output = counts.save(args(1))
+ //output.foreach(t => println(t._1 + ": " + t._2))
+
+ sc.stop()
+
+ // han sampler 2 begin
+ f.cancel(true)
+ // hand sampler 2 end
+ sys.exit(0)
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
index 8dd6c9706e7d..763cffcbf511 100644
--- a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
@@ -25,6 +25,16 @@ import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.PartitionStrategy._
+// han sampler import begin
+import java.io._
+import org.apache.spark.storage._
+import java.lang.System
+import java.util.Date
+import java.util.concurrent._
+import java.text._
+import scala.util.Properties
+// han sampler import end
+
/**
* Driver program for running graph algorithms.
*/
@@ -52,12 +62,39 @@ object Analytics extends Logging {
val options = mutable.Map(optionsList: _*)
val conf = new SparkConf()
+ // han register kryo classes begin
+ /*
+ val intArrayClass = classOf[Array[Int]]
+ val doubleArrayClass = classOf[Array[Double]]
+ val EdgePartitionClass = Class.forName("org.apache.spark.graphx.impl.EdgePartition$mcI$sp")
+ val NonePartitionClass = Class.forName("scala.None$")
+ val ManifestFactory_anon_9Class = Class.forName("scala.reflect.ManifestFactory$$anon$9")
+ val GraphXPrimitiveKeyOpenHashMap_mcJI_spClass = Class.forName("org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp")
+ val ManifestFactory_anon_10Class = Class.forName("scala.reflect.ManifestFactory$$anon$10")
+ val GraphXPrimitiveKeyOpenHashMap_anonfun_1Class = Class.forName("org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$$anonfun$1")
+ val OpenHashSet_LongHasherClass = Class.forName("org.apache.spark.util.collection.OpenHashSet$LongHasher")
+ val GraphXPrimitiveKeyOpenHashMap_anonfun_2Class = Class.forName("org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$$anonfun$2")
+ val ShippableVertexPartitionClass = Class.forName("org.apache.spark.graphx.impl.ShippableVertexPartition")
+ val RoutingTablePartitionClass = Class.forName("org.apache.spark.graphx.impl.RoutingTablePartition")
+ val Tuple3ArrayClass = classOf[Array[scala.Tuple3[Any, Any, Any]]]
+ val ManifestFactory_anon_12Class = Class.forName("scala.reflect.ManifestFactory$$anon$12")
+
+
+ conf.registerKryoClasses(Array(intArrayClass, doubleArrayClass, EdgePartitionClass, NonePartitionClass, ManifestFactory_anon_9Class, GraphXPrimitiveKeyOpenHashMap_mcJI_spClass, ManifestFactory_anon_10Class, GraphXPrimitiveKeyOpenHashMap_anonfun_1Class, OpenHashSet_LongHasherClass, GraphXPrimitiveKeyOpenHashMap_anonfun_2Class, ShippableVertexPartitionClass, RoutingTablePartitionClass, Tuple3ArrayClass, ManifestFactory_anon_12Class))
+ */
+ // han register kryo classes end
+
GraphXUtils.registerKryoClasses(conf)
val numEPart = options.remove("numEPart").map(_.toInt).getOrElse {
println("Set the number of edge partitions using --numEPart.")
sys.exit(1)
}
+
+ // han
+ var storageLevel: StorageLevel = options.remove("storageLevel")
+ .map(StorageLevel.fromString(_)).getOrElse(null)
+
val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy")
.map(PartitionStrategy.fromString(_))
val edgeStorageLevel = options.remove("edgeStorageLevel")
@@ -68,7 +105,11 @@ object Analytics extends Logging {
taskType match {
case "pagerank" =>
val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F)
- val outFname = options.remove("output").getOrElse("")
+
+ // han
+ // val outFname = options.remove("output").getOrElse("")
+ var outFname = options.remove("output").getOrElse("")
+
val numIterOpt = options.remove("numIter").map(_.toInt)
options.foreach {
@@ -81,19 +122,88 @@ object Analytics extends Logging {
val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
+ // han
+ if (!outFname.isEmpty) {
+ outFname += sc.applicationId
+ }
+
+ // han sampler 1 begin
+ val SAMPLING_PERIOD: Long = 10
+ val TIMESTAMP_PERIOD: Long = 1000
+
+ var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss")
+
+ val dirname_application = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + sc.applicationId
+
+ val dir_application = new File(dirname_application)
+ if (!dir_application.exists())
+ dir_application.mkdirs()
+
+ val ex = new ScheduledThreadPoolExecutor(1)
+ val task = new Runnable {
+ var i: Long = 0
+ override def run {
+
+ // sc.getExecutorStorageStatus.filter(s => s.blockManagerId.host.contains("slave1"))
+ sc.getExecutorStorageStatus.foreach {
+ es =>
+ val filename: String = dirname_application + "/sparkOutput_driver_" + sc.applicationId + "_" + es.blockManagerId + ".txt"
+ val file = new File(filename)
+ val writer = new FileWriter(file, true)
+ if (!file.exists()) {
+ file.createNewFile()
+ writer.write(sc.applicationId + "_" + es.blockManagerId + "\n")
+ writer.flush()
+ writer.close()
+ }
+ var s = es.memUsed.toString()
+
+ s += "\t" + es.maxMem.toString()
+ s += "\t" + es.offHeapUsed.toString()
+
+ //println(s)
+ if (i % TIMESTAMP_PERIOD == 0) {
+ i = 0
+ var time: String = dateFormat.format(new Date())
+ s += "\t" + time
+ }
+
+ writer.write(s + "\n")
+ writer.flush()
+ writer.close()
+ }
+ i = i + SAMPLING_PERIOD
+ }
+ }
+ val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS)
+ // han sampler 1 end
+
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
numEdgePartitions = numEPart,
- edgeStorageLevel = edgeStorageLevel,
- vertexStorageLevel = vertexStorageLevel).cache()
+
+ // han
+ // edgeStorageLevel = edgeStorageLevel,
+ // vertexStorageLevel = vertexStorageLevel)
+ edgeStorageLevel = storageLevel,
+ vertexStorageLevel = storageLevel)
+ .cache()
+
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
println("GRAPHX: Number of vertices " + graph.vertices.count)
println("GRAPHX: Number of edges " + graph.edges.count)
+ // han
+ // val pr = (numIterOpt match {
+ // case Some(numIter) => PageRank.run(graph, numIter)
+ // case None => PageRank.runUntilConvergence(graph, tol)
+ // }).vertices
+ // .cache()
val pr = (numIterOpt match {
- case Some(numIter) => PageRank.run(graph, numIter)
- case None => PageRank.runUntilConvergence(graph, tol)
- }).vertices.cache()
+ case Some(numIter) => PageRank.run_storageLevel(graph = graph, numIter = numIter, storageLevel = storageLevel)
+ case None => PageRank.runUntilConvergence_storageLevel(graph = graph, tol = tol, storageLevel = storageLevel)
+ }).vertices
+ .persist(storageLevel)
println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
@@ -104,6 +214,10 @@ object Analytics extends Logging {
sc.stop()
+ // han sampler 2 begin
+ f.cancel(true)
+ // han sampler 2 end
+
case "cc" =>
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala_bk b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala_bk
new file mode 100644
index 000000000000..8dd6c9706e7d
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala_bk
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+import scala.collection.mutable
+import org.apache.spark._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.lib._
+import org.apache.spark.graphx.PartitionStrategy._
+
+/**
+ * Driver program for running graph algorithms.
+ */
+object Analytics extends Logging {
+
+ def main(args: Array[String]): Unit = {
+ if (args.length < 2) {
+ System.err.println(
+ "Usage: Analytics --numEPart= [other options]")
+ System.err.println("Supported 'taskType' as follows:")
+ System.err.println(" pagerank Compute PageRank")
+ System.err.println(" cc Compute the connected components of vertices")
+ System.err.println(" triangles Count the number of triangles")
+ System.exit(1)
+ }
+
+ val taskType = args(0)
+ val fname = args(1)
+ val optionsList = args.drop(2).map { arg =>
+ arg.dropWhile(_ == '-').split('=') match {
+ case Array(opt, v) => (opt -> v)
+ case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
+ }
+ }
+ val options = mutable.Map(optionsList: _*)
+
+ val conf = new SparkConf()
+ GraphXUtils.registerKryoClasses(conf)
+
+ val numEPart = options.remove("numEPart").map(_.toInt).getOrElse {
+ println("Set the number of edge partitions using --numEPart.")
+ sys.exit(1)
+ }
+ val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy")
+ .map(PartitionStrategy.fromString(_))
+ val edgeStorageLevel = options.remove("edgeStorageLevel")
+ .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+ val vertexStorageLevel = options.remove("vertexStorageLevel")
+ .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+
+ taskType match {
+ case "pagerank" =>
+ val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F)
+ val outFname = options.remove("output").getOrElse("")
+ val numIterOpt = options.remove("numIter").map(_.toInt)
+
+ options.foreach {
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ println("======================================")
+ println("| PageRank |")
+ println("======================================")
+
+ val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
+
+ val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+ numEdgePartitions = numEPart,
+ edgeStorageLevel = edgeStorageLevel,
+ vertexStorageLevel = vertexStorageLevel).cache()
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
+
+ println("GRAPHX: Number of vertices " + graph.vertices.count)
+ println("GRAPHX: Number of edges " + graph.edges.count)
+
+ val pr = (numIterOpt match {
+ case Some(numIter) => PageRank.run(graph, numIter)
+ case None => PageRank.runUntilConvergence(graph, tol)
+ }).vertices.cache()
+
+ println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
+
+ if (!outFname.isEmpty) {
+ logWarning("Saving pageranks of pages to " + outFname)
+ pr.map { case (id, r) => id + "\t" + r }.saveAsTextFile(outFname)
+ }
+
+ sc.stop()
+
+ case "cc" =>
+ options.foreach {
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ println("======================================")
+ println("| Connected Components |")
+ println("======================================")
+
+ val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
+ val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+ numEdgePartitions = numEPart,
+ edgeStorageLevel = edgeStorageLevel,
+ vertexStorageLevel = vertexStorageLevel).cache()
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
+
+ val cc = ConnectedComponents.run(graph)
+ println("Components: " + cc.vertices.map { case (vid, data) => data }.distinct())
+ sc.stop()
+
+ case "triangles" =>
+ options.foreach {
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ println("======================================")
+ println("| Triangle Count |")
+ println("======================================")
+
+ val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")"))
+ val graph = GraphLoader.edgeListFile(sc, fname,
+ canonicalOrientation = true,
+ numEdgePartitions = numEPart,
+ edgeStorageLevel = edgeStorageLevel,
+ vertexStorageLevel = vertexStorageLevel)
+ // TriangleCount requires the graph to be partitioned
+ .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache()
+ val triangles = TriangleCount.run(graph)
+ println("Triangles: " + triangles.vertices.map {
+ case (vid, data) => data.toLong
+ }.reduce(_ + _) / 3)
+ sc.stop()
+
+ case _ =>
+ println("Invalid task type.")
+ }
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala
index da3ffca1a6f2..030b7a549894 100644
--- a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala
@@ -24,7 +24,13 @@ package org.apache.spark.examples.graphx
*/
object LiveJournalPageRank {
def main(args: Array[String]) {
- if (args.length < 1) {
+
+ // han
+ println("Application starts:\n" + System.nanoTime())
+
+ // han
+ // if (args.length < 1) {
+ if (args.length < 2) {
System.err.println(
"Usage: LiveJournalPageRank \n" +
" --numEPart=\n" +
@@ -36,11 +42,20 @@ object LiveJournalPageRank {
" If specified, the file to write the ranks to.\n" +
" [--partStrategy=RandomVertexCut | EdgePartition1D | EdgePartition2D | " +
"CanonicalRandomVertexCut]\n" +
- " The way edges are assigned to edge partitions. Default is RandomVertexCut.")
+ " The way edges are assigned to edge partitions. Default is RandomVertexCut." +
+ // han
+ " [--numIter=]" +
+ " Max number of iterations" +
+ " --storageLevel=\n" +
+ "Storage level of RDD caching (NONE, MEMORY_ONLY, MEMORY_AND_DISK, DISK_ONLY)")
System.exit(-1)
}
Analytics.main(args.patch(0, List("pagerank"), 0))
+
+ // han
+ println("Application completes:\n" + System.nanoTime())
+ sys.exit(0)
}
}
// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala_bk b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala_bk
new file mode 100644
index 000000000000..da3ffca1a6f2
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala_bk
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.graphx
+
+/**
+ * Uses GraphX to run PageRank on a LiveJournal social network graph. Download the dataset from
+ * http://snap.stanford.edu/data/soc-LiveJournal1.html.
+ */
+object LiveJournalPageRank {
+ def main(args: Array[String]) {
+ if (args.length < 1) {
+ System.err.println(
+ "Usage: LiveJournalPageRank \n" +
+ " --numEPart=\n" +
+ " The number of partitions for the graph's edge RDD.\n" +
+ " [--tol=]\n" +
+ " The tolerance allowed at convergence (smaller => more accurate). Default is " +
+ "0.001.\n" +
+ " [--output=]\n" +
+ " If specified, the file to write the ranks to.\n" +
+ " [--partStrategy=RandomVertexCut | EdgePartition1D | EdgePartition2D | " +
+ "CanonicalRandomVertexCut]\n" +
+ " The way edges are assigned to edge partitions. Default is RandomVertexCut.")
+ System.exit(-1)
+ }
+
+ Analytics.main(args.patch(0, List("pagerank"), 0))
+ }
+}
+// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala
index 380d85d60e7b..4d5348bbfc2a 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala
@@ -18,13 +18,28 @@
// scalastyle:off println
package org.apache.spark.examples.mllib
-import org.apache.log4j.{Level, Logger}
+import org.apache.log4j.{ Level, Logger }
import scopt.OptionParser
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.mllib.clustering.KMeans
+import org.apache.spark.{ SparkConf, SparkContext }
+import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg.Vectors
+// han sampler import begin
+import java.io._
+import org.apache.spark.storage._
+import java.lang.System
+import java.util.Date
+import java.util.concurrent._
+import java.text._
+import scala.util.Properties
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.hadoop.io.LongWritable
+import org.apache.mahout.math.VectorWritable
+
+// han sampler import end
+
/**
* An example k-means app. Run with
* {{{
@@ -42,12 +57,21 @@ object DenseKMeans {
import InitializationMode._
case class Params(
- input: String = null,
- k: Int = -1,
- numIterations: Int = 10,
- initializationMode: InitializationMode = Parallel) extends AbstractParams[Params]
+ input: String = null,
+ k: Int = -1,
+ numIterations: Int = 10,
+
+ // han
+ storageLevel: String = "NONE",
+ minNumPartitions: Int = -1,
+
+ initializationMode: InitializationMode = Parallel) extends AbstractParams[Params]
def main(args: Array[String]) {
+
+ // han
+ println("Application starts:\n" + System.nanoTime())
+
val defaultParams = Params()
val parser = new OptionParser[Params]("DenseKMeans") {
@@ -56,12 +80,22 @@ object DenseKMeans {
.required()
.text(s"number of clusters, required")
.action((x, c) => c.copy(k = x))
+
+ // han
+ opt[String]("storageLevel")
+ .required()
+ .text(s"storage level of RDD caching (NONE, MEMORY_ONLY, MEMORY_AND_DISK, DISK_ONLY), required")
+ .action((x, c) => c.copy(storageLevel = x))
+ opt[Int]("minNumPartitions")
+ .text(s"minimal number of partitions of the input file")
+ .action((x, c) => c.copy(minNumPartitions = x))
+
opt[Int]("numIterations")
.text(s"number of iterations, default: ${defaultParams.numIterations}")
.action((x, c) => c.copy(numIterations = x))
opt[String]("initMode")
.text(s"initialization mode (${InitializationMode.values.mkString(",")}), " +
- s"default: ${defaultParams.initializationMode}")
+ s"default: ${defaultParams.initializationMode}")
.action((x, c) => c.copy(initializationMode = InitializationMode.withName(x)))
arg[String]("")
.text("input paths to examples")
@@ -74,17 +108,106 @@ object DenseKMeans {
}.getOrElse {
sys.exit(1)
}
+
+ // han
+ println("Application completes:\n" + System.nanoTime())
+
}
def run(params: Params) {
val conf = new SparkConf().setAppName(s"DenseKMeans with $params")
+
+ // mayuresh
+ //if (params.storageLevel=="MEMORY_ONLY_SER") {
+ // conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ // conf.set("spark.kryo.registrationRequired", "true")
+ // conf.registerKryoClasses(Array(classOf[Array[java.lang.Double]],classOf[org.apache.spark.mllib.linalg.DenseVector]))
+ //}
+
val sc = new SparkContext(conf)
- Logger.getRootLogger.setLevel(Level.WARN)
+ // han sampler 1 begin
+ val SAMPLING_PERIOD: Long = 10
+ val TIMESTAMP_PERIOD: Long = 1000
+
+ var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss")
+
+ val dirname_application = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + sc.applicationId
+ val dir_application = new File(dirname_application)
+ if (!dir_application.exists())
+ dir_application.mkdirs()
+
+ val ex = new ScheduledThreadPoolExecutor(1)
+ val task = new Runnable {
+ var i: Long = 0
+ override def run {
+
+ // sc.getExecutorStorageStatus.filter(s => s.blockManagerId.host.contains("slave1"))
+ sc.getExecutorStorageStatus.foreach {
+ es =>
+ val filename: String = dirname_application + "/sparkOutput_driver_" + sc.applicationId + "_" + es.blockManagerId + ".txt"
+ val file = new File(filename)
+ val writer = new FileWriter(file, true)
+ if (!file.exists()) {
+ file.createNewFile()
+ writer.write(sc.applicationId + "_" + es.blockManagerId + "\n")
+ writer.flush()
+ writer.close()
+ }
+ var s = es.memUsed.toString()
+ //println(s)
+ if (i % TIMESTAMP_PERIOD == 0) {
+ i = 0
+ var time: String = dateFormat.format(new Date())
+ s += "\t" + time
+ }
+
+ writer.write(s + "\n")
+ writer.flush()
+ writer.close()
+ }
+ i = i + SAMPLING_PERIOD
+ }
+ }
+ val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS)
+ // han sampler 1 end
+
+ // han
+ // Logger.getRootLogger.setLevel(Level.WARN)
+
+ // han
+ // val examples = sc.textFile(params.input).map { line =>
+ // Vectors.dense(line.split(' ').map(_.toDouble))
+ // }.cache()
+
+ var storageLevel: StorageLevel = null
+ storageLevel = params.storageLevel match {
+ case "NONE" => StorageLevel.NONE
+ case "MEMORY_ONLY" => StorageLevel.MEMORY_ONLY
+ case "MEMORY_AND_DISK" => StorageLevel.MEMORY_AND_DISK
+ case "DISK_ONLY" => StorageLevel.DISK_ONLY
+ case "MEMORY_ONLY_SER" => StorageLevel.MEMORY_ONLY_SER
+ }
- val examples = sc.textFile(params.input).map { line =>
+ var examples: RDD[Vector] = null
+ /*
+ if (params.minNumPartitions < 0)
+ examples = sc.textFile(params.input).map { line =>
+ // examples = sc.binaryFiles(params.input, 1).map { line =>
+ Vectors.dense(line.split(' ').map(_.toDouble))
+ }.persist(storageLevel)
+ else examples = sc.textFile(params.input, params.minNumPartitions).map { line =>
Vectors.dense(line.split(' ').map(_.toDouble))
- }.cache()
+ }.persist(storageLevel)
+ */
+
+ val data = sc.sequenceFile[LongWritable, VectorWritable](params.input)
+ examples = data.map {
+ case (k, v) =>
+ var vector: Array[Double] = new Array[Double](v.get().size)
+ for (i <- 0 until v.get().size) vector(i) = v.get().get(i)
+ Vectors.dense(vector)
+ }.persist(storageLevel)
val numExamples = examples.count()
@@ -95,17 +218,36 @@ object DenseKMeans {
case Parallel => KMeans.K_MEANS_PARALLEL
}
+ // han
+ // val model = new KMeans()
+ // .setInitializationMode(initMode)
+ // .setK(params.k)
+ // .setMaxIterations(params.numIterations)
+ // .run(examples)
val model = new KMeans()
.setInitializationMode(initMode)
.setK(params.k)
.setMaxIterations(params.numIterations)
+ .setStorageLevel(storageLevel)
.run(examples)
val cost = model.computeCost(examples)
println(s"Total cost = $cost.")
+ // han
+ // Save and load model
+ /*
+ val outputPath = "/kmeans-huge/output/" + sc.applicationId
+ model.save(sc, outputPath)
+ val sameModel = KMeansModel.load(sc, outputPath)
+ */
sc.stop()
+
+ // han sampler 2 begin
+ f.cancel(true)
+ sys.exit(0)
+ // hand sampler 2 end
}
}
// scalastyle:on println
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index 52b237fc1509..9b5a499dae93 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -23,6 +23,11 @@ import scala.language.postfixOps
import org.apache.spark.Logging
import org.apache.spark.graphx._
+// han import begin
+import org.apache.spark.storage.StorageLevel
+import scala.collection.mutable.MutableList
+// han import end
+
/**
* PageRank algorithm implementation. There are two implementations of PageRank implemented.
*
@@ -61,6 +66,8 @@ import org.apache.spark.graphx._
*/
object PageRank extends Logging {
+ // han
+ private var storageLevel: StorageLevel = null
/**
* Run PageRank for a fixed number of iterations returning a graph
@@ -79,9 +86,17 @@ object PageRank extends Logging {
*/
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int,
resetProb: Double = 0.15): Graph[Double, Double] =
- {
- runWithOptions(graph, numIter, resetProb)
- }
+ {
+ runWithOptions(graph, numIter, resetProb)
+ }
+
+ // han
+ def run_storageLevel[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int,
+ resetProb: Double = 0.15, storageLevel: StorageLevel): Graph[Double, Double] =
+ {
+ this.storageLevel = storageLevel
+ run(graph, numIter, resetProb)
+ }
/**
* Run PageRank for a fixed number of iterations returning a graph
@@ -101,62 +116,87 @@ object PageRank extends Logging {
*
*/
def runWithOptions[VD: ClassTag, ED: ClassTag](
- graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
- srcId: Option[VertexId] = None): Graph[Double, Double] =
- {
- val personalized = srcId isDefined
- val src: VertexId = srcId.getOrElse(-1L)
-
- // Initialize the PageRank graph with each edge attribute having
- // weight 1/outDegree and each vertex with attribute resetProb.
- // When running personalized pagerank, only the source vertex
- // has an attribute resetProb. All others are set to 0.
- var rankGraph: Graph[Double, Double] = graph
- // Associate the degree with each vertex
- .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
- // Set the weight on the edges based on the degree
- .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
- // Set the vertex attributes to the initial pagerank values
- .mapVertices { (id, attr) =>
- if (!(id != src && personalized)) resetProb else 0.0
- }
+ graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
+ srcId: Option[VertexId] = None): Graph[Double, Double] =
+ {
+ // Initialize the PageRank graph with each edge attribute having
+ // weight 1/outDegree and each vertex with attribute 1.0.
+ var rankGraph: Graph[Double, Double] = graph
+ // Associate the degree with each vertex
+ .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
+ // Set the weight on the edges based on the degree
+ .mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src)
+ // Set the vertex attributes to the initial pagerank values
+ .mapVertices((id, attr) => resetProb)
- def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }
+ val personalized = srcId isDefined
+ val src: VertexId = srcId.getOrElse(-1L)
+ def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }
- var iteration = 0
- var prevRankGraph: Graph[Double, Double] = null
- while (iteration < numIter) {
- rankGraph.cache()
+ var iteration = 0
+ var prevRankGraph: Graph[Double, Double] = null
- // Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
- // do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
- val rankUpdates = rankGraph.aggregateMessages[Double](
- ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src)
+ // han
+ val iterationStartTime = System.nanoTime()
+ println("Iteration 1 starts:\n" + iterationStartTime)
+ var iDurs: MutableList[(Int, Double)] = new MutableList[(Int, Double)]
+ var iterationEndTime_last: Long = iterationStartTime
- // Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
- // that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
- // edge partitions.
- prevRankGraph = rankGraph
- val rPrb = if (personalized) {
- (src: VertexId , id: VertexId) => resetProb * delta(src, id)
- } else {
- (src: VertexId, id: VertexId) => resetProb
- }
+ while (iteration < numIter) {
+ rankGraph
+ // han
+ //.cache()
+ .persist(storageLevel)
- rankGraph = rankGraph.joinVertices(rankUpdates) {
- (id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum
- }.cache()
+ // Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
+ // do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
+ val rankUpdates = rankGraph.aggregateMessages[Double](
+ ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src)
- rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
- logInfo(s"PageRank finished iteration $iteration.")
- prevRankGraph.vertices.unpersist(false)
- prevRankGraph.edges.unpersist(false)
+ // Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
+ // that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
+ // edge partitions.
+ prevRankGraph = rankGraph
+ val rPrb = if (personalized) {
+ (src: VertexId, id: VertexId) => resetProb * delta(src, id)
+ } else {
+ (src: VertexId, id: VertexId) => resetProb
+ }
- iteration += 1
- }
+ rankGraph = rankGraph.joinVertices(rankUpdates) {
+ (id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum
+ }
+ // han
+ //.cache()
+ .persist(storageLevel)
- rankGraph
- }
+ rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
+ logInfo(s"PageRank finished iteration $iteration.")
+
+ prevRankGraph.vertices.unpersist(false)
+ prevRankGraph.edges.unpersist(false)
+
+ // han
+ val iterationEndTime = System.nanoTime()
+ var iDuration: Double = (iterationEndTime - iterationEndTime_last) / 1000000000.0
+ iDurs += ((iteration, iDuration))
+ println("Iteration " + iteration + ": " + iDuration)
+ iterationEndTime_last = iterationEndTime
+
+ iteration += 1
+ }
+
+ // han
+ println("Iterations count:\n" + iteration)
+ println("Iterations complete:\n" + System.nanoTime())
+ println("Iterations:")
+ iDurs.foreach {
+ id =>
+ println("Iteration " + id._1 + ": " + id._2)
+ }
+
+ rankGraph
+ }
/**
* Run a dynamic version of PageRank returning a graph with vertex attributes containing the
@@ -174,9 +214,17 @@ object PageRank extends Logging {
*/
def runUntilConvergence[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] =
- {
+ {
runUntilConvergenceWithOptions(graph, tol, resetProb)
- }
+ }
+
+ // han
+ def runUntilConvergence_storageLevel[VD: ClassTag, ED: ClassTag](
+ graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15, storageLevel: StorageLevel): Graph[Double, Double] =
+ {
+ this.storageLevel = storageLevel
+ runUntilConvergence(graph, tol, resetProb)
+ }
/**
* Run a dynamic version of PageRank returning a graph with vertex attributes containing the
@@ -193,73 +241,74 @@ object PageRank extends Logging {
* @return the graph containing with each vertex containing the PageRank and each edge
* containing the normalized weight.
*/
+
def runUntilConvergenceWithOptions[VD: ClassTag, ED: ClassTag](
- graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15,
- srcId: Option[VertexId] = None): Graph[Double, Double] =
- {
- val personalized = srcId.isDefined
- val src: VertexId = srcId.getOrElse(-1L)
-
- // Initialize the pagerankGraph with each edge attribute
- // having weight 1/outDegree and each vertex with attribute 1.0.
- val pagerankGraph: Graph[(Double, Double), Double] = graph
- // Associate the degree with each vertex
- .outerJoinVertices(graph.outDegrees) {
- (vid, vdata, deg) => deg.getOrElse(0)
- }
- // Set the weight on the edges based on the degree
- .mapTriplets( e => 1.0 / e.srcAttr )
- // Set the vertex attributes to (initalPR, delta = 0)
- .mapVertices { (id, attr) =>
- if (id == src) (resetProb, Double.NegativeInfinity) else (0.0, 0.0)
+ graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15,
+ srcId: Option[VertexId] = None): Graph[Double, Double] =
+ {
+
+ // Initialize the pagerankGraph with each edge attribute
+ // having weight 1/outDegree and each vertex with attribute 1.0.
+ val pagerankGraph: Graph[(Double, Double), Double] = graph
+ // Associate the degree with each vertex
+ .outerJoinVertices(graph.outDegrees) {
+ (vid, vdata, deg) => deg.getOrElse(0)
+ }
+ // Set the weight on the edges based on the degree
+ .mapTriplets(e => 1.0 / e.srcAttr)
+ // Set the vertex attributes to (initalPR, delta = 0)
+ .mapVertices((id, attr) => (0.0, 0.0))
+ // han
+ // .cache()
+ .persist(storageLevel)
+
+ val personalized = srcId.isDefined
+ val src: VertexId = srcId.getOrElse(-1L)
+
+ // Define the three functions needed to implement PageRank in the GraphX
+ // version of Pregel
+ def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
+ val (oldPR, lastDelta) = attr
+ val newPR = oldPR + (1.0 - resetProb) * msgSum
+ (newPR, newPR - oldPR)
}
- .cache()
-
- // Define the three functions needed to implement PageRank in the GraphX
- // version of Pregel
- def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
- val (oldPR, lastDelta) = attr
- val newPR = oldPR + (1.0 - resetProb) * msgSum
- (newPR, newPR - oldPR)
- }
- def personalizedVertexProgram(id: VertexId, attr: (Double, Double),
- msgSum: Double): (Double, Double) = {
- val (oldPR, lastDelta) = attr
- var teleport = oldPR
- val delta = if (src==id) 1.0 else 0.0
- teleport = oldPR*delta
+ def personalizedVertexProgram(id: VertexId, attr: (Double, Double),
+ msgSum: Double): (Double, Double) = {
+ val (oldPR, lastDelta) = attr
+ var teleport = oldPR
+ val delta = if (src == id) 1.0 else 0.0
+ teleport = oldPR * delta
- val newPR = teleport + (1.0 - resetProb) * msgSum
- val newDelta = if (lastDelta == Double.NegativeInfinity) newPR else newPR - oldPR
- (newPR, newDelta)
- }
+ val newPR = teleport + (1.0 - resetProb) * msgSum
+ (newPR, newPR - oldPR)
+ }
- def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
- if (edge.srcAttr._2 > tol) {
- Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
- } else {
- Iterator.empty
+ def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
+ if (edge.srcAttr._2 > tol) {
+ Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
+ } else {
+ Iterator.empty
+ }
}
- }
- def messageCombiner(a: Double, b: Double): Double = a + b
+ def messageCombiner(a: Double, b: Double): Double = a + b
- // The initial message received by all vertices in PageRank
- val initialMessage = if (personalized) 0.0 else resetProb / (1.0 - resetProb)
+ // The initial message received by all vertices in PageRank
+ val initialMessage = resetProb / (1.0 - resetProb)
- // Execute a dynamic version of Pregel.
- val vp = if (personalized) {
- (id: VertexId, attr: (Double, Double), msgSum: Double) =>
- personalizedVertexProgram(id, attr, msgSum)
- } else {
- (id: VertexId, attr: (Double, Double), msgSum: Double) =>
- vertexProgram(id, attr, msgSum)
- }
+ // Execute a dynamic version of Pregel.
+ val vp = if (personalized) {
+ (id: VertexId, attr: (Double, Double), msgSum: Double) =>
+ personalizedVertexProgram(id, attr, msgSum)
+ } else {
+ (id: VertexId, attr: (Double, Double), msgSum: Double) =>
+ vertexProgram(id, attr, msgSum)
+ }
- Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)(
- vp, sendMessage, messageCombiner)
- .mapVertices((vid, attr) => attr._1)
- } // end of deltaPageRank
+ Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)(
+ vp, sendMessage, messageCombiner)
+ .mapVertices((vid, attr) => attr._1)
+ } // end of deltaPageRank
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 2895db7c9061..ffd5c90021ff 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -20,15 +20,19 @@ package org.apache.spark.mllib.clustering
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.Logging
-import org.apache.spark.annotation.{Experimental, Since}
-import org.apache.spark.mllib.linalg.{Vector, Vectors}
-import org.apache.spark.mllib.linalg.BLAS.{axpy, scal}
+import org.apache.spark.annotation.{ Experimental, Since }
+import org.apache.spark.mllib.linalg.{ Vector, Vectors }
+import org.apache.spark.mllib.linalg.BLAS.{ axpy, scal }
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
import org.apache.spark.util.random.XORShiftRandom
+// han import begin
+import scala.collection.mutable.MutableList
+// han import end
+
/**
* K-means clustering with support for multiple parallel runs and a k-means++ like initialization
* mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested,
@@ -69,6 +73,13 @@ class KMeans private (
this
}
+ // han
+ private var storageLevel: StorageLevel = null
+ def setStorageLevel(storageLevel: StorageLevel): this.type = {
+ this.storageLevel = storageLevel
+ this
+ }
+
/**
* Maximum number of iterations to run.
*/
@@ -107,7 +118,7 @@ class KMeans private (
* Number of runs of the algorithm to execute in parallel.
*/
@Since("1.4.0")
- @deprecated("Support for runs is deprecated. This param will have no effect in 1.7.0.", "1.6.0")
+ @Experimental
def getRuns: Int = runs
/**
@@ -117,7 +128,7 @@ class KMeans private (
* return the best clustering found over any run. Default: 1.
*/
@Since("0.8.0")
- @deprecated("Support for runs is deprecated. This param will have no effect in 1.7.0.", "1.6.0")
+ @Experimental
def setRuns(runs: Int): this.type = {
if (runs <= 0) {
throw new IllegalArgumentException("Number of runs must be positive")
@@ -206,11 +217,17 @@ class KMeans private (
// Compute squared norms and cache them.
val norms = data.map(Vectors.norm(_, 2.0))
- norms.persist()
- val zippedData = data.zip(norms).map { case (v, norm) =>
- new VectorWithNorm(v, norm)
+ // han
+ // norms.persist()
+ norms.persist(storageLevel)
+
+ val zippedData = data.zip(norms).map {
+ case (v, norm) =>
+ new VectorWithNorm(v, norm)
}
val model = runAlgorithm(zippedData)
+
+ // han
norms.unpersist()
// Warn at the end of the run as well, for increased visibility.
@@ -261,6 +278,13 @@ class KMeans private (
var iteration = 0
val iterationStartTime = System.nanoTime()
+
+ // han
+ println("Iteration 1 starts:\n" + iterationStartTime)
+
+ // han
+ var iDurs: MutableList[(Int, Double)] = new MutableList[(Int, Double)]
+ var iterationEndTime_last: Long = iterationStartTime
// Execute iterations of Lloyd's algorithm until all runs have converged
while (iteration < maxIterations && !activeRuns.isEmpty) {
@@ -323,32 +347,53 @@ class KMeans private (
}
costs(run) = costAccums(i).value
}
-
activeRuns = activeRuns.filter(active(_))
+
+ // han
+ val iterationEndTime = System.nanoTime()
+ var iDuration: Double = (iterationEndTime - iterationEndTime_last) / 1000000000.0
+ iDurs += ((iteration, iDuration))
+ println("Iteration " + iteration + ": " + iDuration)
+ iterationEndTime_last = iterationEndTime
+
iteration += 1
}
+
+ // han
+ println("Iterations complete:\n" + System.nanoTime())
+
val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9
logInfo(s"Iterations took " + "%.3f".format(iterationTimeInSeconds) + " seconds.")
if (iteration == maxIterations) {
logInfo(s"KMeans reached the max number of iterations: $maxIterations.")
+ // han
+ println(s"KMeans reached the max number of iterations: $maxIterations.")
} else {
logInfo(s"KMeans converged in $iteration iterations.")
+ // han
+ println(s"KMeans converged in $iteration iterations.")
}
val (minCost, bestRun) = costs.zipWithIndex.min
logInfo(s"The cost for the best run is $minCost.")
+ // han
+ println("Iterations:")
+ iDurs.foreach {
+ id =>
+ println("Iteration " + id._1 + ": " + id._2)
+ }
+
new KMeansModel(centers(bestRun).map(_.vector))
}
/**
* Initialize `runs` sets of cluster centers at random.
*/
- private def initRandom(data: RDD[VectorWithNorm])
- : Array[Array[VectorWithNorm]] = {
+ private def initRandom(data: RDD[VectorWithNorm]): Array[Array[VectorWithNorm]] = {
// Sample all the cluster centers in one pass to avoid repeated scans
val sample = data.takeSample(true, runs * k, new XORShiftRandom(this.seed).nextInt()).toSeq
Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v =>
@@ -365,8 +410,7 @@ class KMeans private (
*
* The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf.
*/
- private def initKMeansParallel(data: RDD[VectorWithNorm])
- : Array[Array[VectorWithNorm]] = {
+ private def initKMeansParallel(data: RDD[VectorWithNorm]): Array[Array[VectorWithNorm]] = {
// Initialize empty centers and point costs.
val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm])
var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity))
@@ -393,11 +437,21 @@ class KMeans private (
while (step < initializationSteps) {
val bcNewCenters = data.context.broadcast(newCenters)
val preCosts = costs
- costs = data.zip(preCosts).map { case (point, cost) =>
+
+ // han
+ // costs = data.zip(preCosts).map {
+ // case (point, cost) =>
+ // Array.tabulate(runs) { r =>
+ // math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r))
+ // }
+ // }.persist(StorageLevel.MEMORY_AND_DISK)
+ costs = data.zip(preCosts).map {
+ case (point, cost) =>
Array.tabulate(runs) { r =>
math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r))
}
- }.persist(StorageLevel.MEMORY_AND_DISK)
+ }.persist(storageLevel)
+
val sumCosts = costs
.aggregate(new Array[Double](runs))(
seqOp = (s, v) => {
@@ -417,26 +471,31 @@ class KMeans private (
r += 1
}
s0
- }
- )
+ })
+
+ // han
preCosts.unpersist(blocking = false)
val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) =>
val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
- pointsWithCosts.flatMap { case (p, c) =>
- val rs = (0 until runs).filter { r =>
- rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)
- }
- if (rs.length > 0) Some(p, rs) else None
+ pointsWithCosts.flatMap {
+ case (p, c) =>
+ val rs = (0 until runs).filter { r =>
+ rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)
+ }
+ if (rs.length > 0) Some(p, rs) else None
}
}.collect()
mergeNewCenters()
- chosen.foreach { case (p, rs) =>
- rs.foreach(newCenters(_) += p.toDense)
+ chosen.foreach {
+ case (p, rs) =>
+ rs.foreach(newCenters(_) += p.toDense)
}
step += 1
}
mergeNewCenters()
+
+ // han
costs.unpersist(blocking = false)
// Finally, we might have a set of more than k candidate centers for each run; weigh each
@@ -458,7 +517,6 @@ class KMeans private (
}
}
-
/**
* Top-level methods for calling K-means clustering.
*/
@@ -483,12 +541,12 @@ object KMeans {
*/
@Since("1.3.0")
def train(
- data: RDD[Vector],
- k: Int,
- maxIterations: Int,
- runs: Int,
- initializationMode: String,
- seed: Long): KMeansModel = {
+ data: RDD[Vector],
+ k: Int,
+ maxIterations: Int,
+ runs: Int,
+ initializationMode: String,
+ seed: Long): KMeansModel = {
new KMeans().setK(k)
.setMaxIterations(maxIterations)
.setRuns(runs)
@@ -508,11 +566,11 @@ object KMeans {
*/
@Since("0.8.0")
def train(
- data: RDD[Vector],
- k: Int,
- maxIterations: Int,
- runs: Int,
- initializationMode: String): KMeansModel = {
+ data: RDD[Vector],
+ k: Int,
+ maxIterations: Int,
+ runs: Int,
+ initializationMode: String): KMeansModel = {
new KMeans().setK(k)
.setMaxIterations(maxIterations)
.setRuns(runs)
@@ -525,9 +583,9 @@ object KMeans {
*/
@Since("0.8.0")
def train(
- data: RDD[Vector],
- k: Int,
- maxIterations: Int): KMeansModel = {
+ data: RDD[Vector],
+ k: Int,
+ maxIterations: Int): KMeansModel = {
train(data, k, maxIterations, 1, K_MEANS_PARALLEL)
}
@@ -536,10 +594,10 @@ object KMeans {
*/
@Since("0.8.0")
def train(
- data: RDD[Vector],
- k: Int,
- maxIterations: Int,
- runs: Int): KMeansModel = {
+ data: RDD[Vector],
+ k: Int,
+ maxIterations: Int,
+ runs: Int): KMeansModel = {
train(data, k, maxIterations, runs, K_MEANS_PARALLEL)
}
@@ -547,8 +605,8 @@ object KMeans {
* Returns the index of the closest center to the given point, as well as the squared distance.
*/
private[mllib] def findClosest(
- centers: TraversableOnce[VectorWithNorm],
- point: VectorWithNorm): (Int, Double) = {
+ centers: TraversableOnce[VectorWithNorm],
+ point: VectorWithNorm): (Int, Double) = {
var bestDistance = Double.PositiveInfinity
var bestIndex = 0
var i = 0
@@ -573,8 +631,8 @@ object KMeans {
* Returns the K-means cost of a given point against the given cluster centers.
*/
private[mllib] def pointCost(
- centers: TraversableOnce[VectorWithNorm],
- point: VectorWithNorm): Double =
+ centers: TraversableOnce[VectorWithNorm],
+ point: VectorWithNorm): Double =
findClosest(centers, point)._2
/**
@@ -582,8 +640,8 @@ object KMeans {
* [[org.apache.spark.mllib.util.MLUtils#fastSquaredDistance]].
*/
private[clustering] def fastSquaredDistance(
- v1: VectorWithNorm,
- v2: VectorWithNorm): Double = {
+ v1: VectorWithNorm,
+ v2: VectorWithNorm): Double = {
MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm)
}
@@ -601,8 +659,7 @@ object KMeans {
*
* @see [[org.apache.spark.mllib.clustering.KMeans#fastSquaredDistance]]
*/
-private[clustering]
-class VectorWithNorm(val vector: Vector, val norm: Double) extends Serializable {
+private[clustering] class VectorWithNorm(val vector: Vector, val norm: Double) extends Serializable {
def this(vector: Vector) = this(vector, Vectors.norm(vector, 2.0))
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 0d6b215fe5aa..5a85d9ac942a 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -19,6 +19,8 @@
import java.lang.reflect.Field;
+import java.util.concurrent.ConcurrentHashMap;
+
import sun.misc.Unsafe;
public final class Platform {
@@ -33,6 +35,10 @@ public final class Platform {
public static final int DOUBLE_ARRAY_OFFSET;
+ // mayuresh: hacky way to find offheap usage
+ private static ConcurrentHashMap ADDRESS_SIZE_MAP = new ConcurrentHashMap();
+ public static volatile long TOTAL_BYTES = 0L;
+
public static int getInt(Object object, long offset) {
return _UNSAFE.getInt(object, offset);
}
@@ -98,10 +104,19 @@ public static void putObjectVolatile(Object object, long offset, Object value) {
}
public static long allocateMemory(long size) {
- return _UNSAFE.allocateMemory(size);
+ long address = _UNSAFE.allocateMemory(size);
+ // mayuresh: hacky way of tracking offheap usage
+ TOTAL_BYTES += size;
+ ADDRESS_SIZE_MAP.put(address, size);
+System.out.println("-- Adding to offheap: " + Thread.currentThread().getStackTrace());
+ return address;
}
public static void freeMemory(long address) {
+ // mayuresh: hacky way of tracking offheap usage
+ TOTAL_BYTES -= ADDRESS_SIZE_MAP.get(address);
+ ADDRESS_SIZE_MAP.remove(address);
+System.out.println("-- Removing from offheap: " + Thread.currentThread().getStackTrace());
_UNSAFE.freeMemory(address);
}