From db21ec0163c8f2d7a91a663804e097333344745f Mon Sep 17 00:00:00 2001 From: Mayuresh Kunjir Date: Mon, 21 Dec 2015 11:36:18 -0500 Subject: [PATCH 1/2] Changes for memory profiling including benchmark applications --- .../CoarseGrainedExecutorBackend.scala | 300 ++++++++++++++++-- .../org/apache/spark/examples/Sort.scala | 131 ++++++++ .../org/apache/spark/examples/SortDF.scala | 138 ++++++++ .../org/apache/spark/examples/WordCount.scala | 117 +++++++ .../apache/spark/examples/WordCountDF.scala | 123 +++++++ .../spark/examples/graphx/Analytics.scala | 126 +++++++- .../spark/examples/graphx/Analytics.scala_bk | 155 +++++++++ .../examples/graphx/LiveJournalPageRank.scala | 19 +- .../graphx/LiveJournalPageRank.scala_bk | 46 +++ .../apache/spark/graphx/lib/PageRank.scala | 271 +++++++++------- .../spark/mllib/clustering/KMeans.scala | 159 +++++++--- .../org/apache/spark/unsafe/Platform.java | 42 ++- 12 files changed, 1402 insertions(+), 225 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/examples/Sort.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/SortDF.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/WordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/WordCountDF.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala_bk create mode 100644 examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala_bk diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index c2ebf3059621..96acb6b6db15 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -23,7 +23,7 @@ import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{ Failure, Success } import org.apache.spark.rpc._ import org.apache.spark._ @@ -33,17 +33,33 @@ import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils} +import org.apache.spark.util.{ ThreadUtils, SignalLogger, Utils } + +// han sampler import begin +import java.io._ +import org.apache.spark.storage._ +import java.lang.System +import java.util.concurrent._ +import java.lang.management._ +import java.util.List +import java.util.Date +import scala.collection.JavaConversions._ +import java.text._ +import scala.util.Properties +import scala.sys.process._ +// han sampler import end private[spark] class CoarseGrainedExecutorBackend( - override val rpcEnv: RpcEnv, - driverUrl: String, - executorId: String, - hostPort: String, - cores: Int, - userClassPath: Seq[URL], - env: SparkEnv) - extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { + override val rpcEnv: RpcEnv, + driverUrl: String, + executorId: String, + hostPort: String, + cores: Int, + userClassPath: Seq[URL], + env: SparkEnv) + extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { + + Utils.checkHostPort(hostPort, "Expected hostport") var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None @@ -57,12 +73,12 @@ private[spark] class CoarseGrainedExecutorBackend( rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) - ref.ask[RegisterExecutorResponse]( + ref.ask[RegisteredExecutor.type]( RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => Utils.tryLogNonFatalError { - Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse + Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor } case Failure(e) => { logError(s"Cannot register with driver: $driverUrl", e) @@ -78,8 +94,9 @@ private[spark] class CoarseGrainedExecutorBackend( } override def receive: PartialFunction[Any, Unit] = { - case RegisteredExecutor(hostname) => + case RegisteredExecutor => logInfo("Successfully registered with driver") + val (hostname, _) = Utils.parseHostPort(hostPort) executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) case RegisterExecutorFailed(message) => @@ -107,11 +124,6 @@ private[spark] class CoarseGrainedExecutorBackend( case StopExecutor => logInfo("Driver commanded a shutdown") - // Cannot shutdown here because an ack may need to be sent back to the caller. So send - // a message to self to actually do the shutdown. - self.send(Shutdown) - - case Shutdown => executor.stop() stop() rpcEnv.shutdown() @@ -138,13 +150,13 @@ private[spark] class CoarseGrainedExecutorBackend( private[spark] object CoarseGrainedExecutorBackend extends Logging { private def run( - driverUrl: String, - executorId: String, - hostname: String, - cores: Int, - appId: String, - workerUrl: Option[String], - userClassPath: Seq[URL]) { + driverUrl: String, + executorId: String, + hostname: String, + cores: Int, + appId: String, + workerUrl: Option[String], + userClassPath: Seq[URL]) { SignalLogger.register(log) @@ -160,8 +172,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { hostname, port, executorConf, - new SecurityManager(executorConf), - clientMode = true) + new SecurityManager(executorConf)) val driver = fetcher.setupEndpointRefByURI(driverUrl) val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++ Seq[(String, String)](("spark.app.id", appId)) @@ -186,12 +197,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, isLocal = false) - // SparkEnv will set spark.executor.port if the rpc env is listening for incoming - // connections (e.g., if it's using akka). Otherwise, the executor is running in - // client mode only, and does not accept incoming connections. - val sparkHostPort = env.conf.getOption("spark.executor.port").map { port => - hostname + ":" + port - }.orNull + // SparkEnv sets spark.driver.port so it shouldn't be 0 anymore. + val boundPort = env.conf.getInt("spark.executor.port", 0) + assert(boundPort != 0) + + // Start the CoarseGrainedExecutorBackend endpoint. + val sparkHostPort = hostname + ":" + boundPort env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env)) workerUrl.foreach { url => @@ -202,7 +213,50 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } } + def getProcessUsedMemoryAndCPU(processID: String): (Double, Double) = { + var memory = 0d; + var CPU = 0d; + val NUMCPU = 8; + var process = "" +/* val commands = new java.util.Vector[String]() + commands.add("/bin/bash") + commands.add("-c") + commands.add("ps aux | grep " + processID) + val pb=new java.lang.ProcessBuilder(commands) + val pr=pb.start() + pr.waitFor() + if (pr.exitValue()==0) { + val outReader=new java.io.BufferedReader(new java.io.InputStreamReader(pr.getInputStream())); + var source = "" + source = outReader.readLine() + while(source != null) { + try { + val tokens = source.split(" +") + if(tokens(1).equals(processID)) { + memory = 1024L*tokens(5).toLong + CPU = tokens(2).toDouble / NUMCPU + } + } catch { case e: Exception => () } + finally { + source = outReader.readLine() + } + } + } +*/ + + val topout = Seq("/bin/sh", "-c", "top -n 1 -b -p " + processID + " | tail -1").!!.trim.split(" +") + val len = topout(5).length + if(topout(5).endsWith("g")) { memory = 1024L*1024L*1024L*topout(5).take(len-1).toDouble } + else if(topout(5).endsWith("m")) { memory = 1024L*1024L*topout(5).take(len-1).toDouble } + else if(topout(5).endsWith("k")) { memory = 1024L*topout(5).take(len-1).toDouble } + else { memory = topout(5).toDouble } + CPU = topout(8).toDouble / NUMCPU + + return (memory, CPU) + } + def main(args: Array[String]) { + var driverUrl: String = null var executorId: String = null var hostname: String = null @@ -250,7 +304,186 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { printUsageAndExit() } + // han sampler 1 begin + val SAMPLING_PERIOD: Long = 10 + val JMAP_PERIOD: Long = 5000 + val TIMESTAMP_PERIOD: Long = 1000 + var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss") + + val dirname_executor = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + appId + "/" + executorId + val dir_executor = new File(dirname_executor) + if (!dir_executor.exists()) + dir_executor.mkdirs() + val dirname_histo = dirname_executor + "/histo" + val dir_histo = new File(dirname_histo) + if (!dir_histo.exists()) + dir_histo.mkdirs() + + // val writer = new FileWriter(new File("/home/ubuntu/sparkOutput/sparkOutput_executor_" + System.nanoTime() + ".txt"), true) + val writer = new FileWriter(new File(dirname_executor + "/" + "sparkOutput_worker_" + appId + "_" + executorId + ".txt"), true) + writer.write(appId + "_" + executorId + "\n") + writer.flush() + + var osBean: com.sun.management.OperatingSystemMXBean = ManagementFactory.getPlatformMXBean(classOf[com.sun.management.OperatingSystemMXBean]) + var availableProcessors: Int = osBean.getAvailableProcessors() + logInfo("Number of available processors for executor " + executorId + ": " + availableProcessors) + var avgUsedCPU: Double = 0 + var numberOfCPUSamples: Long = 1 + var memBean: MemoryMXBean = ManagementFactory.getMemoryMXBean() + var rtBean: RuntimeMXBean = ManagementFactory.getRuntimeMXBean() + var mpBeans: List[MemoryPoolMXBean] = ManagementFactory.getMemoryPoolMXBeans() + var s: String = "" + mpBeans.foreach { + + b => + s += b.getName() + "\t"; + } + s += "Used heap\tCommitted heap\tMax heap\tUsed nonheap\tCommitted nonheap\tMax nonheap\tTotal memory\tUsed CPU" + writer.write(s + "\n") + writer.flush() + + var mBeans: List[MemoryPoolMXBean] = null + var prevUpTime: Double = 0 + var prevProcessCPUTime: Double = 0 + var upTime: Double = 0 + var processCPUTime: Double = 0 + + var processID: String = "" + var datanodePID: String = "" + var nodemanagerPID: String = "" + +/* val commands = new java.util.Vector[String]() + commands.add("/bin/bash") + commands.add("-c") + commands.add("echo $PPID") + val pb=new java.lang.ProcessBuilder(commands) + val pr=pb.start() + pr.waitFor() + if (pr.exitValue()==0) { + val outReader=new java.io.BufferedReader(new java.io.InputStreamReader(pr.getInputStream())); + processID = outReader.readLine().trim() + println("Found process ID: " + processID) + } else { + println("Error while getting PID") + } +*/ + val pname = ManagementFactory.getRuntimeMXBean().getName() + processID = pname.substring(0, pname.indexOf('@')) + + val jps = new java.util.Vector[String]() + jps.add("/bin/bash") + jps.add("-c") + jps.add("jps") + val pbi=new java.lang.ProcessBuilder(jps) + val pri = pbi.start() + pri.waitFor() + if (pri.exitValue()==0) { + val outReader=new java.io.BufferedReader(new java.io.InputStreamReader(pri.getInputStream())); + var source = "" + source = outReader.readLine() + while(source != null) { + try { + val tokens = source.split(" +") + if(tokens(1).equals("DataNode")) { + datanodePID = tokens(0) + //println("Found datanode PID: " + datanodePID) + } + if(tokens(1).equals("NodeManager")) { + nodemanagerPID = tokens(0) + //println("Found nodemanager pid: " + nodemanagerPID) + } + } catch { case e: Exception => () } + finally { + source = outReader.readLine() + } + } + } else { + println("Error while getting jps output") + } + + + val ex = new ScheduledThreadPoolExecutor(1) + ex.setRemoveOnCancelPolicy(true) + val task = new Runnable { + var i: Long = 0 + override def run { + + s = "" + mpBeans.foreach { + b => + s += b.getUsage().getUsed() + "\t"; + } + s += memBean.getHeapMemoryUsage().getUsed() + "\t" + s += memBean.getHeapMemoryUsage().getCommitted() + "\t" + s += memBean.getHeapMemoryUsage().getMax() + "\t" + s += memBean.getNonHeapMemoryUsage().getUsed() + "\t" + s += memBean.getNonHeapMemoryUsage().getCommitted() + "\t" + s += memBean.getNonHeapMemoryUsage().getMax() + "\t" + // get used memory from jmap + // val jmapout = Seq("/bin/sh", "-c", "jmap -histo " + processID + " | tail -1").!!.trim + // s += jmapout.split(" +")(2) + + // record off heap memory usage + s += org.apache.spark.unsafe.Platform.TOTAL_BYTES; + + upTime = rtBean.getUptime() * 10000 + processCPUTime = osBean.getProcessCpuTime() + var elapsedCPU: Double = processCPUTime - prevProcessCPUTime + var elapsedTime: Double = upTime - prevUpTime + var usedCPU: Double = 0 + if (elapsedTime > 0.0) { + usedCPU = math.min(99.0, elapsedCPU / (elapsedTime * availableProcessors)) + avgUsedCPU += usedCPU + numberOfCPUSamples += 1 + } + s += "\t" + usedCPU.toString() + prevUpTime = upTime + prevProcessCPUTime = processCPUTime + + var res: (Double, Double) = (0, 0); + try { res = getProcessUsedMemoryAndCPU(processID); s += "\t" + res._1 + "\t" + res._2 } catch{ case e:Exception => e.printStackTrace() } + + try { res = getProcessUsedMemoryAndCPU(datanodePID); s += "\t" + res._1 + "\t" + res._2 } catch{ case e:Exception => e.printStackTrace() } + try { res = getProcessUsedMemoryAndCPU(nodemanagerPID); s += "\t" + res._1 + "\t" + res._2 } catch{ case e:Exception => e.printStackTrace() } + + if (i % TIMESTAMP_PERIOD == 0) { + var time: String = dateFormat.format(new Date()) + s += "\t" + time + } + + + if (i % JMAP_PERIOD == 0) { + var time: String = dateFormat.format(new Date()) + val pname = ManagementFactory.getRuntimeMXBean().getName() + val pid = pname.substring(0, pname.indexOf('@')) + val command = "jmap -histo " + pid + val result = command.!! + val writer1 = new FileWriter(new File(dirname_histo + "/" + "sparkOutput_worker_" + appId + "_" + executorId + "_" + time + ".txt"), true) + writer1.write(result) + writer1.flush() + writer1.close() + } + + + i = i + SAMPLING_PERIOD + writer.write(s + "\n") + writer.flush() + + } + } + val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS) + // han sampler 1 end + run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) + + // han sampler 2 begin + avgUsedCPU /= numberOfCPUSamples + logInfo("Average used CPU of executor " + executorId + ": " + avgUsedCPU) + + f.cancel(true) + writer.flush() + writer.close() + // han sampler 2 end } private def printUsageAndExit() = { @@ -273,3 +506,4 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } } + diff --git a/examples/src/main/scala/org/apache/spark/examples/Sort.scala b/examples/src/main/scala/org/apache/spark/examples/Sort.scala new file mode 100644 index 000000000000..352e69e563a2 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/Sort.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples + +import scala.Tuple2 +import org.apache.spark.SparkConf + +import org.apache.spark._ + +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; + +// han sampler import begin +import java.io._ +import org.apache.spark.storage._ +import java.lang.System +import java.util.Date +import java.util.concurrent._ +import java.text._ +import scala.util.Properties +// han sampler import end + +object Sort { + + val SPACE = Pattern.compile(" ") + + def main(args: Array[String]) { + if (args.length < 1) { + println("Usage: WordCount "); + System.exit(1); + } + + val conf = new SparkConf().setAppName("Sort") + + // han register kyro classes begin + +// val tuple3ArrayClass = classOf[Array[Tuple3[Any, Any, Any]]] +// val anonClass = Class.forName("scala.reflect.ClassTag$$anon$1") +// val javaClassClass = classOf[java.lang.Class[Any]] +// val StringClass = classOf[java.lang.String] +// val StringArrayClass = classOf[Array[java.lang.String]] + +// conf.registerKryoClasses(Array(tuple3ArrayClass, anonClass, javaClassClass, StringClass, StringArrayClass)) + + // han register kyro classes end + + val sc = new SparkContext(conf) + + // han sampler 1 begin + val SAMPLING_PERIOD: Long = 10 + val TIMESTAMP_PERIOD: Long = 1000 + + var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss") + + val dirname_application = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + sc.applicationId + val dir_application = new File(dirname_application) + if (!dir_application.exists()) + dir_application.mkdirs() + + val ex = new ScheduledThreadPoolExecutor(1) + val task = new Runnable { + var i: Long = 0 + override def run { + + // sc.getExecutorStorageStatus.filter(s => s.blockManagerId.host.contains("slave1")) + sc.getExecutorStorageStatus.foreach { + es => + val filename: String = dirname_application + "/sparkOutput_driver_" + sc.applicationId + "_" + es.blockManagerId + ".txt" + val file = new File(filename) + val writer = new FileWriter(file, true) + if (!file.exists()) { + file.createNewFile() + writer.write(sc.applicationId + "_" + es.blockManagerId + "\n") + writer.flush() + //writer.close() + } + + var s = es.memUsed.toString() + //println(s) + if (i % TIMESTAMP_PERIOD == 0) { + i = 0 + var time: String = dateFormat.format(new Date()) + s += "\t" + time + } + + writer.write(s + "\n") + writer.flush() + writer.close() + } + i = i + SAMPLING_PERIOD + } + } + val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS) + // han sampler 1 end + + val lines = sc.textFile(args(0), 1) + //val parallel = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism)/2 + val data = lines.map((_, 1)) + //val partitioner = new HashPartitioner(partitions = parallel) + val sorted = data.sortByKey().map(_._1) + + //val output = counts.collect() + //output.foreach(t => println(t._1 + ": " + t._2)) + sorted.saveAsTextFile(args(1)) + + sc.stop() + + // han sampler 2 begin + f.cancel(true) + // hand sampler 2 end + sys.exit(0) + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/SortDF.scala b/examples/src/main/scala/org/apache/spark/examples/SortDF.scala new file mode 100644 index 000000000000..98c96e43c1df --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/SortDF.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples + +import scala.Tuple2 +import org.apache.spark.SparkConf + +import org.apache.spark._ + +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; + +// han sampler import begin +import java.io._ +import org.apache.spark.storage._ +import java.lang.System +import java.util.Date +import java.util.concurrent._ +import java.text._ +import scala.util.Properties +// han sampler import end + +//case class Counts(word: String, count: Int) + +object SortDF { + + val SPACE = Pattern.compile(" ") + + def main(args: Array[String]) { + if (args.length < 1) { + println("Usage: WordCount "); + System.exit(1); + } + + val conf = new SparkConf().setAppName("Sort DF") + + // han register kyro classes begin + /* + val tuple3ArrayClass = classOf[Array[Tuple3[Any, Any, Any]]] + val anonClass = Class.forName("scala.reflect.ClassTag$$anon$1") + val javaClassClass = classOf[java.lang.Class[Any]] + val StringClass = classOf[java.lang.String] + val StringArrayClass = classOf[Array[java.lang.String]] + + conf.registerKryoClasses(Array(tuple3ArrayClass, anonClass, javaClassClass, StringClass, StringArrayClass)) + */ + // han register kyro classes end + + val sc = new SparkContext(conf) + + // han sampler 1 begin + val SAMPLING_PERIOD: Long = 10 + val TIMESTAMP_PERIOD: Long = 1000 + + var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss") + + val dirname_application = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + sc.applicationId + val dir_application = new File(dirname_application) + if (!dir_application.exists()) + dir_application.mkdirs() + + val ex = new ScheduledThreadPoolExecutor(1) + val task = new Runnable { + var i: Long = 0 + override def run { + + // sc.getExecutorStorageStatus.filter(s => s.blockManagerId.host.contains("slave1")) + sc.getExecutorStorageStatus.foreach { + es => + val filename: String = dirname_application + "/sparkOutput_driver_" + sc.applicationId + "_" + es.blockManagerId + ".txt" + val file = new File(filename) + val writer = new FileWriter(file, true) + if (!file.exists()) { + file.createNewFile() + writer.write(sc.applicationId + "_" + es.blockManagerId + "\n") + writer.flush() + //writer.close() + } + + var s = es.memUsed.toString() + //println(s) + if (i % TIMESTAMP_PERIOD == 0) { + i = 0 + var time: String = dateFormat.format(new Date()) + s += "\t" + time + } + + writer.write(s + "\n") + writer.flush() + writer.close() + } + i = i + SAMPLING_PERIOD + } + } + val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS) + // han sampler 1 end + + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + import sqlContext.implicits._ + + val lines = sc.textFile(args(0), 1) + //val parallel = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism)/2 + val data = lines.map((_, 1)).map(t=>Counts(t._1, t._2)).toDF() + data.registerTempTable("data") + + //val partitioner = new HashPartitioner(partitions = parallel) + val sorted = data.sort("word").select("word") + + sorted.save(args(1)) + + //output.foreach(t => println(t._1 + ": " + t._2)) + + sc.stop() + + // han sampler 2 begin + f.cancel(true) + // hand sampler 2 end + sys.exit(0) + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/WordCount.scala b/examples/src/main/scala/org/apache/spark/examples/WordCount.scala new file mode 100644 index 000000000000..09fd8a03267a --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/WordCount.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples + +import scala.Tuple2 +import org.apache.spark.SparkConf + +import org.apache.spark._ + +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; + +// han sampler import begin +import java.io._ +import org.apache.spark.storage._ +import java.lang.System +import java.util.Date +import java.util.concurrent._ +import java.text._ +import scala.util.Properties +// han sampler import end + +object WordCount { + + val SPACE = Pattern.compile(" ") + + def main(args: Array[String]) { + if (args.length < 1) { + println("Usage: WordCount "); + System.exit(1); + } + + val conf = new SparkConf().setAppName("WordCount") + val sc = new SparkContext(conf) + + // han sampler 1 begin + val SAMPLING_PERIOD: Long = 10 + val TIMESTAMP_PERIOD: Long = 1000 + + var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss") + + val dirname_application = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + sc.applicationId + val dir_application = new File(dirname_application) + if (!dir_application.exists()) + dir_application.mkdirs() + + val ex = new ScheduledThreadPoolExecutor(1) + val task = new Runnable { + var i: Long = 0 + override def run { + + // sc.getExecutorStorageStatus.filter(s => s.blockManagerId.host.contains("slave1")) + sc.getExecutorStorageStatus.foreach { + es => + val filename: String = dirname_application + "/sparkOutput_driver_" + sc.applicationId + "_" + es.blockManagerId + ".txt" + val file = new File(filename) + val writer = new FileWriter(file, true) + if (!file.exists()) { + file.createNewFile() + writer.write(sc.applicationId + "_" + es.blockManagerId + "\n") + writer.flush() + //writer.close() + } + var s = es.memUsed.toString() + //println(s) + if (i % TIMESTAMP_PERIOD == 0) { + i = 0 + var time: String = dateFormat.format(new Date()) + s += "\t" + time + } + + writer.write(s + "\n") + writer.flush() + writer.close() + } + i = i + SAMPLING_PERIOD + } + } + val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS) + // han sampler 1 end + + val lines = sc.textFile(args(0), 1) + + val words = lines.flatMap(l => SPACE.split(l)) + val ones = words.map(w => (w,1)) + val counts = ones.reduceByKey(_ + _) + + //val output = counts.collect() + //output.foreach(t => println(t._1 + ": " + t._2)) + counts.saveAsTextFile(args(1)) + + sc.stop() + + // han sampler 2 begin + f.cancel(true) + // hand sampler 2 end + sys.exit(0) + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/WordCountDF.scala b/examples/src/main/scala/org/apache/spark/examples/WordCountDF.scala new file mode 100644 index 000000000000..c252d37d5afa --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/WordCountDF.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples + +import scala.Tuple2 +import org.apache.spark.SparkConf + +import org.apache.spark._ + +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; + +// han sampler import begin +import java.io._ +import org.apache.spark.storage._ +import java.lang.System +import java.util.Date +import java.util.concurrent._ +import java.text._ +import scala.util.Properties +// han sampler import end + +case class Counts(word: String, one: Int) + +object WordCountDF { + + val SPACE = Pattern.compile(" ") + + def main(args: Array[String]) { + if (args.length < 1) { + println("Usage: WordCount "); + System.exit(1); + } + + val conf = new SparkConf().setAppName("WordCount DF") + val sc = new SparkContext(conf) + + // han sampler 1 begin + val SAMPLING_PERIOD: Long = 10 + val TIMESTAMP_PERIOD: Long = 1000 + + var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss") + + val dirname_application = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + sc.applicationId + val dir_application = new File(dirname_application) + if (!dir_application.exists()) + dir_application.mkdirs() + + val ex = new ScheduledThreadPoolExecutor(1) + val task = new Runnable { + var i: Long = 0 + override def run { + + // sc.getExecutorStorageStatus.filter(s => s.blockManagerId.host.contains("slave1")) + sc.getExecutorStorageStatus.foreach { + es => + val filename: String = dirname_application + "/sparkOutput_driver_" + sc.applicationId + "_" + es.blockManagerId + ".txt" + val file = new File(filename) + val writer = new FileWriter(file, true) + if (!file.exists()) { + file.createNewFile() + writer.write(sc.applicationId + "_" + es.blockManagerId + "\n") + writer.flush() + //writer.close() + } + var s = es.memUsed.toString() + //println(s) + if (i % TIMESTAMP_PERIOD == 0) { + i = 0 + var time: String = dateFormat.format(new Date()) + s += "\t" + time + } + + writer.write(s + "\n") + writer.flush() + writer.close() + } + i = i + SAMPLING_PERIOD + } + } + val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS) + // han sampler 1 end + + val sqlContext = new org.apache.spark.sql.SQLContext(sc) + import sqlContext.implicits._ + + val lines = sc.textFile(args(0), 1) + + val words = lines.flatMap(l => SPACE.split(l)) + val ones = words.map(w => (w,1)).map(t => Counts(t._1, t._2)).toDF() + ones.registerTempTable("ones") + + val counts = sqlContext.sql("SELECT word, COUNT(*) FROM ones GROUP BY word") + + val output = counts.save(args(1)) + //output.foreach(t => println(t._1 + ": " + t._2)) + + sc.stop() + + // han sampler 2 begin + f.cancel(true) + // hand sampler 2 end + sys.exit(0) + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala index 8dd6c9706e7d..763cffcbf511 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala @@ -25,6 +25,16 @@ import org.apache.spark.graphx._ import org.apache.spark.graphx.lib._ import org.apache.spark.graphx.PartitionStrategy._ +// han sampler import begin +import java.io._ +import org.apache.spark.storage._ +import java.lang.System +import java.util.Date +import java.util.concurrent._ +import java.text._ +import scala.util.Properties +// han sampler import end + /** * Driver program for running graph algorithms. */ @@ -52,12 +62,39 @@ object Analytics extends Logging { val options = mutable.Map(optionsList: _*) val conf = new SparkConf() + // han register kryo classes begin + /* + val intArrayClass = classOf[Array[Int]] + val doubleArrayClass = classOf[Array[Double]] + val EdgePartitionClass = Class.forName("org.apache.spark.graphx.impl.EdgePartition$mcI$sp") + val NonePartitionClass = Class.forName("scala.None$") + val ManifestFactory_anon_9Class = Class.forName("scala.reflect.ManifestFactory$$anon$9") + val GraphXPrimitiveKeyOpenHashMap_mcJI_spClass = Class.forName("org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp") + val ManifestFactory_anon_10Class = Class.forName("scala.reflect.ManifestFactory$$anon$10") + val GraphXPrimitiveKeyOpenHashMap_anonfun_1Class = Class.forName("org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$$anonfun$1") + val OpenHashSet_LongHasherClass = Class.forName("org.apache.spark.util.collection.OpenHashSet$LongHasher") + val GraphXPrimitiveKeyOpenHashMap_anonfun_2Class = Class.forName("org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$$anonfun$2") + val ShippableVertexPartitionClass = Class.forName("org.apache.spark.graphx.impl.ShippableVertexPartition") + val RoutingTablePartitionClass = Class.forName("org.apache.spark.graphx.impl.RoutingTablePartition") + val Tuple3ArrayClass = classOf[Array[scala.Tuple3[Any, Any, Any]]] + val ManifestFactory_anon_12Class = Class.forName("scala.reflect.ManifestFactory$$anon$12") + + + conf.registerKryoClasses(Array(intArrayClass, doubleArrayClass, EdgePartitionClass, NonePartitionClass, ManifestFactory_anon_9Class, GraphXPrimitiveKeyOpenHashMap_mcJI_spClass, ManifestFactory_anon_10Class, GraphXPrimitiveKeyOpenHashMap_anonfun_1Class, OpenHashSet_LongHasherClass, GraphXPrimitiveKeyOpenHashMap_anonfun_2Class, ShippableVertexPartitionClass, RoutingTablePartitionClass, Tuple3ArrayClass, ManifestFactory_anon_12Class)) + */ + // han register kryo classes end + GraphXUtils.registerKryoClasses(conf) val numEPart = options.remove("numEPart").map(_.toInt).getOrElse { println("Set the number of edge partitions using --numEPart.") sys.exit(1) } + + // han + var storageLevel: StorageLevel = options.remove("storageLevel") + .map(StorageLevel.fromString(_)).getOrElse(null) + val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy") .map(PartitionStrategy.fromString(_)) val edgeStorageLevel = options.remove("edgeStorageLevel") @@ -68,7 +105,11 @@ object Analytics extends Logging { taskType match { case "pagerank" => val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F) - val outFname = options.remove("output").getOrElse("") + + // han + // val outFname = options.remove("output").getOrElse("") + var outFname = options.remove("output").getOrElse("") + val numIterOpt = options.remove("numIter").map(_.toInt) options.foreach { @@ -81,19 +122,88 @@ object Analytics extends Logging { val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")")) + // han + if (!outFname.isEmpty) { + outFname += sc.applicationId + } + + // han sampler 1 begin + val SAMPLING_PERIOD: Long = 10 + val TIMESTAMP_PERIOD: Long = 1000 + + var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss") + + val dirname_application = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + sc.applicationId + + val dir_application = new File(dirname_application) + if (!dir_application.exists()) + dir_application.mkdirs() + + val ex = new ScheduledThreadPoolExecutor(1) + val task = new Runnable { + var i: Long = 0 + override def run { + + // sc.getExecutorStorageStatus.filter(s => s.blockManagerId.host.contains("slave1")) + sc.getExecutorStorageStatus.foreach { + es => + val filename: String = dirname_application + "/sparkOutput_driver_" + sc.applicationId + "_" + es.blockManagerId + ".txt" + val file = new File(filename) + val writer = new FileWriter(file, true) + if (!file.exists()) { + file.createNewFile() + writer.write(sc.applicationId + "_" + es.blockManagerId + "\n") + writer.flush() + writer.close() + } + var s = es.memUsed.toString() + + s += "\t" + es.maxMem.toString() + s += "\t" + es.offHeapUsed.toString() + + //println(s) + if (i % TIMESTAMP_PERIOD == 0) { + i = 0 + var time: String = dateFormat.format(new Date()) + s += "\t" + time + } + + writer.write(s + "\n") + writer.flush() + writer.close() + } + i = i + SAMPLING_PERIOD + } + } + val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS) + // han sampler 1 end + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, numEdgePartitions = numEPart, - edgeStorageLevel = edgeStorageLevel, - vertexStorageLevel = vertexStorageLevel).cache() + + // han + // edgeStorageLevel = edgeStorageLevel, + // vertexStorageLevel = vertexStorageLevel) + edgeStorageLevel = storageLevel, + vertexStorageLevel = storageLevel) + .cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) println("GRAPHX: Number of vertices " + graph.vertices.count) println("GRAPHX: Number of edges " + graph.edges.count) + // han + // val pr = (numIterOpt match { + // case Some(numIter) => PageRank.run(graph, numIter) + // case None => PageRank.runUntilConvergence(graph, tol) + // }).vertices + // .cache() val pr = (numIterOpt match { - case Some(numIter) => PageRank.run(graph, numIter) - case None => PageRank.runUntilConvergence(graph, tol) - }).vertices.cache() + case Some(numIter) => PageRank.run_storageLevel(graph = graph, numIter = numIter, storageLevel = storageLevel) + case None => PageRank.runUntilConvergence_storageLevel(graph = graph, tol = tol, storageLevel = storageLevel) + }).vertices + .persist(storageLevel) println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _)) @@ -104,6 +214,10 @@ object Analytics extends Logging { sc.stop() + // han sampler 2 begin + f.cancel(true) + // han sampler 2 end + case "cc" => options.foreach { case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala_bk b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala_bk new file mode 100644 index 000000000000..8dd6c9706e7d --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala_bk @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.graphx + +import scala.collection.mutable +import org.apache.spark._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.graphx._ +import org.apache.spark.graphx.lib._ +import org.apache.spark.graphx.PartitionStrategy._ + +/** + * Driver program for running graph algorithms. + */ +object Analytics extends Logging { + + def main(args: Array[String]): Unit = { + if (args.length < 2) { + System.err.println( + "Usage: Analytics --numEPart= [other options]") + System.err.println("Supported 'taskType' as follows:") + System.err.println(" pagerank Compute PageRank") + System.err.println(" cc Compute the connected components of vertices") + System.err.println(" triangles Count the number of triangles") + System.exit(1) + } + + val taskType = args(0) + val fname = args(1) + val optionsList = args.drop(2).map { arg => + arg.dropWhile(_ == '-').split('=') match { + case Array(opt, v) => (opt -> v) + case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + } + } + val options = mutable.Map(optionsList: _*) + + val conf = new SparkConf() + GraphXUtils.registerKryoClasses(conf) + + val numEPart = options.remove("numEPart").map(_.toInt).getOrElse { + println("Set the number of edge partitions using --numEPart.") + sys.exit(1) + } + val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy") + .map(PartitionStrategy.fromString(_)) + val edgeStorageLevel = options.remove("edgeStorageLevel") + .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) + val vertexStorageLevel = options.remove("vertexStorageLevel") + .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) + + taskType match { + case "pagerank" => + val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F) + val outFname = options.remove("output").getOrElse("") + val numIterOpt = options.remove("numIter").map(_.toInt) + + options.foreach { + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + println("======================================") + println("| PageRank |") + println("======================================") + + val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")")) + + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + numEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) + + println("GRAPHX: Number of vertices " + graph.vertices.count) + println("GRAPHX: Number of edges " + graph.edges.count) + + val pr = (numIterOpt match { + case Some(numIter) => PageRank.run(graph, numIter) + case None => PageRank.runUntilConvergence(graph, tol) + }).vertices.cache() + + println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _)) + + if (!outFname.isEmpty) { + logWarning("Saving pageranks of pages to " + outFname) + pr.map { case (id, r) => id + "\t" + r }.saveAsTextFile(outFname) + } + + sc.stop() + + case "cc" => + options.foreach { + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + println("======================================") + println("| Connected Components |") + println("======================================") + + val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")")) + val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, + numEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel).cache() + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) + + val cc = ConnectedComponents.run(graph) + println("Components: " + cc.vertices.map { case (vid, data) => data }.distinct()) + sc.stop() + + case "triangles" => + options.foreach { + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + println("======================================") + println("| Triangle Count |") + println("======================================") + + val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")")) + val graph = GraphLoader.edgeListFile(sc, fname, + canonicalOrientation = true, + numEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel) + // TriangleCount requires the graph to be partitioned + .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache() + val triangles = TriangleCount.run(graph) + println("Triangles: " + triangles.vertices.map { + case (vid, data) => data.toLong + }.reduce(_ + _) / 3) + sc.stop() + + case _ => + println("Invalid task type.") + } + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala index da3ffca1a6f2..030b7a549894 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala @@ -24,7 +24,13 @@ package org.apache.spark.examples.graphx */ object LiveJournalPageRank { def main(args: Array[String]) { - if (args.length < 1) { + + // han + println("Application starts:\n" + System.nanoTime()) + + // han + // if (args.length < 1) { + if (args.length < 2) { System.err.println( "Usage: LiveJournalPageRank \n" + " --numEPart=\n" + @@ -36,11 +42,20 @@ object LiveJournalPageRank { " If specified, the file to write the ranks to.\n" + " [--partStrategy=RandomVertexCut | EdgePartition1D | EdgePartition2D | " + "CanonicalRandomVertexCut]\n" + - " The way edges are assigned to edge partitions. Default is RandomVertexCut.") + " The way edges are assigned to edge partitions. Default is RandomVertexCut." + + // han + " [--numIter=]" + + " Max number of iterations" + + " --storageLevel=\n" + + "Storage level of RDD caching (NONE, MEMORY_ONLY, MEMORY_AND_DISK, DISK_ONLY)") System.exit(-1) } Analytics.main(args.patch(0, List("pagerank"), 0)) + + // han + println("Application completes:\n" + System.nanoTime()) + sys.exit(0) } } // scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala_bk b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala_bk new file mode 100644 index 000000000000..da3ffca1a6f2 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala_bk @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.graphx + +/** + * Uses GraphX to run PageRank on a LiveJournal social network graph. Download the dataset from + * http://snap.stanford.edu/data/soc-LiveJournal1.html. + */ +object LiveJournalPageRank { + def main(args: Array[String]) { + if (args.length < 1) { + System.err.println( + "Usage: LiveJournalPageRank \n" + + " --numEPart=\n" + + " The number of partitions for the graph's edge RDD.\n" + + " [--tol=]\n" + + " The tolerance allowed at convergence (smaller => more accurate). Default is " + + "0.001.\n" + + " [--output=]\n" + + " If specified, the file to write the ranks to.\n" + + " [--partStrategy=RandomVertexCut | EdgePartition1D | EdgePartition2D | " + + "CanonicalRandomVertexCut]\n" + + " The way edges are assigned to edge partitions. Default is RandomVertexCut.") + System.exit(-1) + } + + Analytics.main(args.patch(0, List("pagerank"), 0)) + } +} +// scalastyle:on println diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index 52b237fc1509..9b5a499dae93 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -23,6 +23,11 @@ import scala.language.postfixOps import org.apache.spark.Logging import org.apache.spark.graphx._ +// han import begin +import org.apache.spark.storage.StorageLevel +import scala.collection.mutable.MutableList +// han import end + /** * PageRank algorithm implementation. There are two implementations of PageRank implemented. * @@ -61,6 +66,8 @@ import org.apache.spark.graphx._ */ object PageRank extends Logging { + // han + private var storageLevel: StorageLevel = null /** * Run PageRank for a fixed number of iterations returning a graph @@ -79,9 +86,17 @@ object PageRank extends Logging { */ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = - { - runWithOptions(graph, numIter, resetProb) - } + { + runWithOptions(graph, numIter, resetProb) + } + + // han + def run_storageLevel[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, + resetProb: Double = 0.15, storageLevel: StorageLevel): Graph[Double, Double] = + { + this.storageLevel = storageLevel + run(graph, numIter, resetProb) + } /** * Run PageRank for a fixed number of iterations returning a graph @@ -101,62 +116,87 @@ object PageRank extends Logging { * */ def runWithOptions[VD: ClassTag, ED: ClassTag]( - graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15, - srcId: Option[VertexId] = None): Graph[Double, Double] = - { - val personalized = srcId isDefined - val src: VertexId = srcId.getOrElse(-1L) - - // Initialize the PageRank graph with each edge attribute having - // weight 1/outDegree and each vertex with attribute resetProb. - // When running personalized pagerank, only the source vertex - // has an attribute resetProb. All others are set to 0. - var rankGraph: Graph[Double, Double] = graph - // Associate the degree with each vertex - .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } - // Set the weight on the edges based on the degree - .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src ) - // Set the vertex attributes to the initial pagerank values - .mapVertices { (id, attr) => - if (!(id != src && personalized)) resetProb else 0.0 - } + graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15, + srcId: Option[VertexId] = None): Graph[Double, Double] = + { + // Initialize the PageRank graph with each edge attribute having + // weight 1/outDegree and each vertex with attribute 1.0. + var rankGraph: Graph[Double, Double] = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } + // Set the weight on the edges based on the degree + .mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src) + // Set the vertex attributes to the initial pagerank values + .mapVertices((id, attr) => resetProb) - def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 } + val personalized = srcId isDefined + val src: VertexId = srcId.getOrElse(-1L) + def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 } - var iteration = 0 - var prevRankGraph: Graph[Double, Double] = null - while (iteration < numIter) { - rankGraph.cache() + var iteration = 0 + var prevRankGraph: Graph[Double, Double] = null - // Compute the outgoing rank contributions of each vertex, perform local preaggregation, and - // do the final aggregation at the receiving vertices. Requires a shuffle for aggregation. - val rankUpdates = rankGraph.aggregateMessages[Double]( - ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src) + // han + val iterationStartTime = System.nanoTime() + println("Iteration 1 starts:\n" + iterationStartTime) + var iDurs: MutableList[(Int, Double)] = new MutableList[(Int, Double)] + var iterationEndTime_last: Long = iterationStartTime - // Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices - // that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the - // edge partitions. - prevRankGraph = rankGraph - val rPrb = if (personalized) { - (src: VertexId , id: VertexId) => resetProb * delta(src, id) - } else { - (src: VertexId, id: VertexId) => resetProb - } + while (iteration < numIter) { + rankGraph + // han + //.cache() + .persist(storageLevel) - rankGraph = rankGraph.joinVertices(rankUpdates) { - (id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum - }.cache() + // Compute the outgoing rank contributions of each vertex, perform local preaggregation, and + // do the final aggregation at the receiving vertices. Requires a shuffle for aggregation. + val rankUpdates = rankGraph.aggregateMessages[Double]( + ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src) - rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices - logInfo(s"PageRank finished iteration $iteration.") - prevRankGraph.vertices.unpersist(false) - prevRankGraph.edges.unpersist(false) + // Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices + // that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the + // edge partitions. + prevRankGraph = rankGraph + val rPrb = if (personalized) { + (src: VertexId, id: VertexId) => resetProb * delta(src, id) + } else { + (src: VertexId, id: VertexId) => resetProb + } - iteration += 1 - } + rankGraph = rankGraph.joinVertices(rankUpdates) { + (id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum + } + // han + //.cache() + .persist(storageLevel) - rankGraph - } + rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices + logInfo(s"PageRank finished iteration $iteration.") + + prevRankGraph.vertices.unpersist(false) + prevRankGraph.edges.unpersist(false) + + // han + val iterationEndTime = System.nanoTime() + var iDuration: Double = (iterationEndTime - iterationEndTime_last) / 1000000000.0 + iDurs += ((iteration, iDuration)) + println("Iteration " + iteration + ": " + iDuration) + iterationEndTime_last = iterationEndTime + + iteration += 1 + } + + // han + println("Iterations count:\n" + iteration) + println("Iterations complete:\n" + System.nanoTime()) + println("Iterations:") + iDurs.foreach { + id => + println("Iteration " + id._1 + ": " + id._2) + } + + rankGraph + } /** * Run a dynamic version of PageRank returning a graph with vertex attributes containing the @@ -174,9 +214,17 @@ object PageRank extends Logging { */ def runUntilConvergence[VD: ClassTag, ED: ClassTag]( graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = - { + { runUntilConvergenceWithOptions(graph, tol, resetProb) - } + } + + // han + def runUntilConvergence_storageLevel[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15, storageLevel: StorageLevel): Graph[Double, Double] = + { + this.storageLevel = storageLevel + runUntilConvergence(graph, tol, resetProb) + } /** * Run a dynamic version of PageRank returning a graph with vertex attributes containing the @@ -193,73 +241,74 @@ object PageRank extends Logging { * @return the graph containing with each vertex containing the PageRank and each edge * containing the normalized weight. */ + def runUntilConvergenceWithOptions[VD: ClassTag, ED: ClassTag]( - graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15, - srcId: Option[VertexId] = None): Graph[Double, Double] = - { - val personalized = srcId.isDefined - val src: VertexId = srcId.getOrElse(-1L) - - // Initialize the pagerankGraph with each edge attribute - // having weight 1/outDegree and each vertex with attribute 1.0. - val pagerankGraph: Graph[(Double, Double), Double] = graph - // Associate the degree with each vertex - .outerJoinVertices(graph.outDegrees) { - (vid, vdata, deg) => deg.getOrElse(0) - } - // Set the weight on the edges based on the degree - .mapTriplets( e => 1.0 / e.srcAttr ) - // Set the vertex attributes to (initalPR, delta = 0) - .mapVertices { (id, attr) => - if (id == src) (resetProb, Double.NegativeInfinity) else (0.0, 0.0) + graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15, + srcId: Option[VertexId] = None): Graph[Double, Double] = + { + + // Initialize the pagerankGraph with each edge attribute + // having weight 1/outDegree and each vertex with attribute 1.0. + val pagerankGraph: Graph[(Double, Double), Double] = graph + // Associate the degree with each vertex + .outerJoinVertices(graph.outDegrees) { + (vid, vdata, deg) => deg.getOrElse(0) + } + // Set the weight on the edges based on the degree + .mapTriplets(e => 1.0 / e.srcAttr) + // Set the vertex attributes to (initalPR, delta = 0) + .mapVertices((id, attr) => (0.0, 0.0)) + // han + // .cache() + .persist(storageLevel) + + val personalized = srcId.isDefined + val src: VertexId = srcId.getOrElse(-1L) + + // Define the three functions needed to implement PageRank in the GraphX + // version of Pregel + def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = { + val (oldPR, lastDelta) = attr + val newPR = oldPR + (1.0 - resetProb) * msgSum + (newPR, newPR - oldPR) } - .cache() - - // Define the three functions needed to implement PageRank in the GraphX - // version of Pregel - def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = { - val (oldPR, lastDelta) = attr - val newPR = oldPR + (1.0 - resetProb) * msgSum - (newPR, newPR - oldPR) - } - def personalizedVertexProgram(id: VertexId, attr: (Double, Double), - msgSum: Double): (Double, Double) = { - val (oldPR, lastDelta) = attr - var teleport = oldPR - val delta = if (src==id) 1.0 else 0.0 - teleport = oldPR*delta + def personalizedVertexProgram(id: VertexId, attr: (Double, Double), + msgSum: Double): (Double, Double) = { + val (oldPR, lastDelta) = attr + var teleport = oldPR + val delta = if (src == id) 1.0 else 0.0 + teleport = oldPR * delta - val newPR = teleport + (1.0 - resetProb) * msgSum - val newDelta = if (lastDelta == Double.NegativeInfinity) newPR else newPR - oldPR - (newPR, newDelta) - } + val newPR = teleport + (1.0 - resetProb) * msgSum + (newPR, newPR - oldPR) + } - def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = { - if (edge.srcAttr._2 > tol) { - Iterator((edge.dstId, edge.srcAttr._2 * edge.attr)) - } else { - Iterator.empty + def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = { + if (edge.srcAttr._2 > tol) { + Iterator((edge.dstId, edge.srcAttr._2 * edge.attr)) + } else { + Iterator.empty + } } - } - def messageCombiner(a: Double, b: Double): Double = a + b + def messageCombiner(a: Double, b: Double): Double = a + b - // The initial message received by all vertices in PageRank - val initialMessage = if (personalized) 0.0 else resetProb / (1.0 - resetProb) + // The initial message received by all vertices in PageRank + val initialMessage = resetProb / (1.0 - resetProb) - // Execute a dynamic version of Pregel. - val vp = if (personalized) { - (id: VertexId, attr: (Double, Double), msgSum: Double) => - personalizedVertexProgram(id, attr, msgSum) - } else { - (id: VertexId, attr: (Double, Double), msgSum: Double) => - vertexProgram(id, attr, msgSum) - } + // Execute a dynamic version of Pregel. + val vp = if (personalized) { + (id: VertexId, attr: (Double, Double), msgSum: Double) => + personalizedVertexProgram(id, attr, msgSum) + } else { + (id: VertexId, attr: (Double, Double), msgSum: Double) => + vertexProgram(id, attr, msgSum) + } - Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)( - vp, sendMessage, messageCombiner) - .mapVertices((vid, attr) => attr._1) - } // end of deltaPageRank + Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)( + vp, sendMessage, messageCombiner) + .mapVertices((vid, attr) => attr._1) + } // end of deltaPageRank } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 2895db7c9061..ffd5c90021ff 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -20,15 +20,19 @@ package org.apache.spark.mllib.clustering import scala.collection.mutable.ArrayBuffer import org.apache.spark.Logging -import org.apache.spark.annotation.{Experimental, Since} -import org.apache.spark.mllib.linalg.{Vector, Vectors} -import org.apache.spark.mllib.linalg.BLAS.{axpy, scal} +import org.apache.spark.annotation.{ Experimental, Since } +import org.apache.spark.mllib.linalg.{ Vector, Vectors } +import org.apache.spark.mllib.linalg.BLAS.{ axpy, scal } import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils import org.apache.spark.util.random.XORShiftRandom +// han import begin +import scala.collection.mutable.MutableList +// han import end + /** * K-means clustering with support for multiple parallel runs and a k-means++ like initialization * mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested, @@ -69,6 +73,13 @@ class KMeans private ( this } + // han + private var storageLevel: StorageLevel = null + def setStorageLevel(storageLevel: StorageLevel): this.type = { + this.storageLevel = storageLevel + this + } + /** * Maximum number of iterations to run. */ @@ -107,7 +118,7 @@ class KMeans private ( * Number of runs of the algorithm to execute in parallel. */ @Since("1.4.0") - @deprecated("Support for runs is deprecated. This param will have no effect in 1.7.0.", "1.6.0") + @Experimental def getRuns: Int = runs /** @@ -117,7 +128,7 @@ class KMeans private ( * return the best clustering found over any run. Default: 1. */ @Since("0.8.0") - @deprecated("Support for runs is deprecated. This param will have no effect in 1.7.0.", "1.6.0") + @Experimental def setRuns(runs: Int): this.type = { if (runs <= 0) { throw new IllegalArgumentException("Number of runs must be positive") @@ -206,11 +217,17 @@ class KMeans private ( // Compute squared norms and cache them. val norms = data.map(Vectors.norm(_, 2.0)) - norms.persist() - val zippedData = data.zip(norms).map { case (v, norm) => - new VectorWithNorm(v, norm) + // han + // norms.persist() + norms.persist(storageLevel) + + val zippedData = data.zip(norms).map { + case (v, norm) => + new VectorWithNorm(v, norm) } val model = runAlgorithm(zippedData) + + // han norms.unpersist() // Warn at the end of the run as well, for increased visibility. @@ -261,6 +278,13 @@ class KMeans private ( var iteration = 0 val iterationStartTime = System.nanoTime() + + // han + println("Iteration 1 starts:\n" + iterationStartTime) + + // han + var iDurs: MutableList[(Int, Double)] = new MutableList[(Int, Double)] + var iterationEndTime_last: Long = iterationStartTime // Execute iterations of Lloyd's algorithm until all runs have converged while (iteration < maxIterations && !activeRuns.isEmpty) { @@ -323,32 +347,53 @@ class KMeans private ( } costs(run) = costAccums(i).value } - activeRuns = activeRuns.filter(active(_)) + + // han + val iterationEndTime = System.nanoTime() + var iDuration: Double = (iterationEndTime - iterationEndTime_last) / 1000000000.0 + iDurs += ((iteration, iDuration)) + println("Iteration " + iteration + ": " + iDuration) + iterationEndTime_last = iterationEndTime + iteration += 1 } + + // han + println("Iterations complete:\n" + System.nanoTime()) + val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9 logInfo(s"Iterations took " + "%.3f".format(iterationTimeInSeconds) + " seconds.") if (iteration == maxIterations) { logInfo(s"KMeans reached the max number of iterations: $maxIterations.") + // han + println(s"KMeans reached the max number of iterations: $maxIterations.") } else { logInfo(s"KMeans converged in $iteration iterations.") + // han + println(s"KMeans converged in $iteration iterations.") } val (minCost, bestRun) = costs.zipWithIndex.min logInfo(s"The cost for the best run is $minCost.") + // han + println("Iterations:") + iDurs.foreach { + id => + println("Iteration " + id._1 + ": " + id._2) + } + new KMeansModel(centers(bestRun).map(_.vector)) } /** * Initialize `runs` sets of cluster centers at random. */ - private def initRandom(data: RDD[VectorWithNorm]) - : Array[Array[VectorWithNorm]] = { + private def initRandom(data: RDD[VectorWithNorm]): Array[Array[VectorWithNorm]] = { // Sample all the cluster centers in one pass to avoid repeated scans val sample = data.takeSample(true, runs * k, new XORShiftRandom(this.seed).nextInt()).toSeq Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v => @@ -365,8 +410,7 @@ class KMeans private ( * * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. */ - private def initKMeansParallel(data: RDD[VectorWithNorm]) - : Array[Array[VectorWithNorm]] = { + private def initKMeansParallel(data: RDD[VectorWithNorm]): Array[Array[VectorWithNorm]] = { // Initialize empty centers and point costs. val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm]) var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity)) @@ -393,11 +437,21 @@ class KMeans private ( while (step < initializationSteps) { val bcNewCenters = data.context.broadcast(newCenters) val preCosts = costs - costs = data.zip(preCosts).map { case (point, cost) => + + // han + // costs = data.zip(preCosts).map { + // case (point, cost) => + // Array.tabulate(runs) { r => + // math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r)) + // } + // }.persist(StorageLevel.MEMORY_AND_DISK) + costs = data.zip(preCosts).map { + case (point, cost) => Array.tabulate(runs) { r => math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r)) } - }.persist(StorageLevel.MEMORY_AND_DISK) + }.persist(storageLevel) + val sumCosts = costs .aggregate(new Array[Double](runs))( seqOp = (s, v) => { @@ -417,26 +471,31 @@ class KMeans private ( r += 1 } s0 - } - ) + }) + + // han preCosts.unpersist(blocking = false) val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) => val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) - pointsWithCosts.flatMap { case (p, c) => - val rs = (0 until runs).filter { r => - rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r) - } - if (rs.length > 0) Some(p, rs) else None + pointsWithCosts.flatMap { + case (p, c) => + val rs = (0 until runs).filter { r => + rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r) + } + if (rs.length > 0) Some(p, rs) else None } }.collect() mergeNewCenters() - chosen.foreach { case (p, rs) => - rs.foreach(newCenters(_) += p.toDense) + chosen.foreach { + case (p, rs) => + rs.foreach(newCenters(_) += p.toDense) } step += 1 } mergeNewCenters() + + // han costs.unpersist(blocking = false) // Finally, we might have a set of more than k candidate centers for each run; weigh each @@ -458,7 +517,6 @@ class KMeans private ( } } - /** * Top-level methods for calling K-means clustering. */ @@ -483,12 +541,12 @@ object KMeans { */ @Since("1.3.0") def train( - data: RDD[Vector], - k: Int, - maxIterations: Int, - runs: Int, - initializationMode: String, - seed: Long): KMeansModel = { + data: RDD[Vector], + k: Int, + maxIterations: Int, + runs: Int, + initializationMode: String, + seed: Long): KMeansModel = { new KMeans().setK(k) .setMaxIterations(maxIterations) .setRuns(runs) @@ -508,11 +566,11 @@ object KMeans { */ @Since("0.8.0") def train( - data: RDD[Vector], - k: Int, - maxIterations: Int, - runs: Int, - initializationMode: String): KMeansModel = { + data: RDD[Vector], + k: Int, + maxIterations: Int, + runs: Int, + initializationMode: String): KMeansModel = { new KMeans().setK(k) .setMaxIterations(maxIterations) .setRuns(runs) @@ -525,9 +583,9 @@ object KMeans { */ @Since("0.8.0") def train( - data: RDD[Vector], - k: Int, - maxIterations: Int): KMeansModel = { + data: RDD[Vector], + k: Int, + maxIterations: Int): KMeansModel = { train(data, k, maxIterations, 1, K_MEANS_PARALLEL) } @@ -536,10 +594,10 @@ object KMeans { */ @Since("0.8.0") def train( - data: RDD[Vector], - k: Int, - maxIterations: Int, - runs: Int): KMeansModel = { + data: RDD[Vector], + k: Int, + maxIterations: Int, + runs: Int): KMeansModel = { train(data, k, maxIterations, runs, K_MEANS_PARALLEL) } @@ -547,8 +605,8 @@ object KMeans { * Returns the index of the closest center to the given point, as well as the squared distance. */ private[mllib] def findClosest( - centers: TraversableOnce[VectorWithNorm], - point: VectorWithNorm): (Int, Double) = { + centers: TraversableOnce[VectorWithNorm], + point: VectorWithNorm): (Int, Double) = { var bestDistance = Double.PositiveInfinity var bestIndex = 0 var i = 0 @@ -573,8 +631,8 @@ object KMeans { * Returns the K-means cost of a given point against the given cluster centers. */ private[mllib] def pointCost( - centers: TraversableOnce[VectorWithNorm], - point: VectorWithNorm): Double = + centers: TraversableOnce[VectorWithNorm], + point: VectorWithNorm): Double = findClosest(centers, point)._2 /** @@ -582,8 +640,8 @@ object KMeans { * [[org.apache.spark.mllib.util.MLUtils#fastSquaredDistance]]. */ private[clustering] def fastSquaredDistance( - v1: VectorWithNorm, - v2: VectorWithNorm): Double = { + v1: VectorWithNorm, + v2: VectorWithNorm): Double = { MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm) } @@ -601,8 +659,7 @@ object KMeans { * * @see [[org.apache.spark.mllib.clustering.KMeans#fastSquaredDistance]] */ -private[clustering] -class VectorWithNorm(val vector: Vector, val norm: Double) extends Serializable { +private[clustering] class VectorWithNorm(val vector: Vector, val norm: Double) extends Serializable { def this(vector: Vector) = this(vector, Vectors.norm(vector, 2.0)) diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java index 0d6b215fe5aa..e11132c76753 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java @@ -19,6 +19,8 @@ import java.lang.reflect.Field; +import java.util.concurrent.ConcurrentHashMap; + import sun.misc.Unsafe; public final class Platform { @@ -33,6 +35,10 @@ public final class Platform { public static final int DOUBLE_ARRAY_OFFSET; + // mayuresh: hacky way to find offheap usage + private static ConcurrentHashMap ADDRESS_SIZE_MAP = new ConcurrentHashMap(); + public static volatile long TOTAL_BYTES = 0L; + public static int getInt(Object object, long offset) { return _UNSAFE.getInt(object, offset); } @@ -98,36 +104,28 @@ public static void putObjectVolatile(Object object, long offset, Object value) { } public static long allocateMemory(long size) { - return _UNSAFE.allocateMemory(size); + long address = _UNSAFE.allocateMemory(size); + // mayuresh: hacky way of tracking offheap usage + TOTAL_BYTES += size; + ADDRESS_SIZE_MAP.put(address, size); + return address; } public static void freeMemory(long address) { + // mayuresh: hacky way of tracking offheap usage + TOTAL_BYTES -= ADDRESS_SIZE_MAP.get(address); + ADDRESS_SIZE_MAP.remove(address); _UNSAFE.freeMemory(address); } public static void copyMemory( Object src, long srcOffset, Object dst, long dstOffset, long length) { - // Check if dstOffset is before or after srcOffset to determine if we should copy - // forward or backwards. This is necessary in case src and dst overlap. - if (dstOffset < srcOffset) { - while (length > 0) { - long size = Math.min(length, UNSAFE_COPY_THRESHOLD); - _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); - length -= size; - srcOffset += size; - dstOffset += size; - } - } else { - srcOffset += length; - dstOffset += length; - while (length > 0) { - long size = Math.min(length, UNSAFE_COPY_THRESHOLD); - srcOffset -= size; - dstOffset -= size; - _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); - length -= size; - } - + while (length > 0) { + long size = Math.min(length, UNSAFE_COPY_THRESHOLD); + _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); + length -= size; + srcOffset += size; + dstOffset += size; } } From bf40a4d0c5bdd2f85d70d7b82f4a71f23ab5036a Mon Sep 17 00:00:00 2001 From: Mayuresh Kunjir Date: Wed, 23 Dec 2015 22:29:47 -0500 Subject: [PATCH 2/2] Reverted a change in executor backend. Added DenseKMeans. --- .../CoarseGrainedExecutorBackend.scala | 47 ++--- .../apache/spark/memory/MemoryManager.scala | 5 + examples/pom.xml | 5 + .../spark/examples/mllib/DenseKMeans.scala | 164 ++++++++++++++++-- .../org/apache/spark/unsafe/Platform.java | 29 +++- 5 files changed, 211 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 96acb6b6db15..8e169e78fc1d 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -50,17 +50,14 @@ import scala.sys.process._ // han sampler import end private[spark] class CoarseGrainedExecutorBackend( - override val rpcEnv: RpcEnv, - driverUrl: String, - executorId: String, - hostPort: String, - cores: Int, - userClassPath: Seq[URL], - env: SparkEnv) - extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { - - Utils.checkHostPort(hostPort, "Expected hostport") - + override val rpcEnv: RpcEnv, + driverUrl: String, + executorId: String, + hostPort: String, + cores: Int, + userClassPath: Seq[URL], + env: SparkEnv) + extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None @@ -73,12 +70,12 @@ private[spark] class CoarseGrainedExecutorBackend( rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) - ref.ask[RegisteredExecutor.type]( + ref.ask[RegisterExecutorResponse]( RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => Utils.tryLogNonFatalError { - Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor + Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutorResponse } case Failure(e) => { logError(s"Cannot register with driver: $driverUrl", e) @@ -94,9 +91,9 @@ private[spark] class CoarseGrainedExecutorBackend( } override def receive: PartialFunction[Any, Unit] = { - case RegisteredExecutor => + case RegisteredExecutor(hostname) => logInfo("Successfully registered with driver") - val (hostname, _) = Utils.parseHostPort(hostPort) +// val (hostname, _) = Utils.parseHostPort(hostPort) executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) case RegisterExecutorFailed(message) => @@ -124,6 +121,11 @@ private[spark] class CoarseGrainedExecutorBackend( case StopExecutor => logInfo("Driver commanded a shutdown") + // Cannot shutdown here because an ack may need to be sent back to the caller. So send + // a message to self to actually do the shutdown. + self.send(Shutdown) + + case Shutdown => executor.stop() stop() rpcEnv.shutdown() @@ -172,7 +174,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { hostname, port, executorConf, - new SecurityManager(executorConf)) + new SecurityManager(executorConf), + clientMode=true) val driver = fetcher.setupEndpointRefByURI(driverUrl) val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++ Seq[(String, String)](("spark.app.id", appId)) @@ -197,12 +200,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, isLocal = false) - // SparkEnv sets spark.driver.port so it shouldn't be 0 anymore. - val boundPort = env.conf.getInt("spark.executor.port", 0) - assert(boundPort != 0) - - // Start the CoarseGrainedExecutorBackend endpoint. - val sparkHostPort = hostname + ":" + boundPort + // SparkEnv will set spark.executor.port if the rpc env is listening for incoming + // connections (e.g., if it's using akka). Otherwise, the executor is running in + // client mode only, and does not accept incoming connections. + val sparkHostPort = env.conf.getOption("spark.executor.port").map { port => + hostname + ":" + port + }.orNull env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env)) workerUrl.foreach { url => diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index e707e27d96b5..a5b9f142f1e7 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -182,6 +182,10 @@ private[spark] abstract class MemoryManager( * sun.misc.Unsafe. */ final val tungstenMemoryMode: MemoryMode = { +//println("-- spark.memory.offHeap.enabled" + conf.getBoolean("spark.memory.offHeap.enabled", false)); +//println("-- spark.memory.offHeap.size" + conf.getSizeAsBytes("spark.memory.offHeap.size")); +//MemoryMode.OFF_HEAP + if (conf.getBoolean("spark.memory.offHeap.enabled", false)) { require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0, "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true") @@ -189,6 +193,7 @@ private[spark] abstract class MemoryManager( } else { MemoryMode.ON_HEAP } + } /** diff --git a/examples/pom.xml b/examples/pom.xml index f5ab2a7fdc09..d6aa6af5153a 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -41,6 +41,11 @@ ${project.version} provided + + org.apache.mahout + mahout-core + 0.9 + org.apache.spark spark-streaming_${scala.binary.version} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala index 380d85d60e7b..4d5348bbfc2a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala @@ -18,13 +18,28 @@ // scalastyle:off println package org.apache.spark.examples.mllib -import org.apache.log4j.{Level, Logger} +import org.apache.log4j.{ Level, Logger } import scopt.OptionParser -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.mllib.clustering.KMeans +import org.apache.spark.{ SparkConf, SparkContext } +import org.apache.spark.mllib.clustering._ import org.apache.spark.mllib.linalg.Vectors +// han sampler import begin +import java.io._ +import org.apache.spark.storage._ +import java.lang.System +import java.util.Date +import java.util.concurrent._ +import java.text._ +import scala.util.Properties +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.Vector +import org.apache.hadoop.io.LongWritable +import org.apache.mahout.math.VectorWritable + +// han sampler import end + /** * An example k-means app. Run with * {{{ @@ -42,12 +57,21 @@ object DenseKMeans { import InitializationMode._ case class Params( - input: String = null, - k: Int = -1, - numIterations: Int = 10, - initializationMode: InitializationMode = Parallel) extends AbstractParams[Params] + input: String = null, + k: Int = -1, + numIterations: Int = 10, + + // han + storageLevel: String = "NONE", + minNumPartitions: Int = -1, + + initializationMode: InitializationMode = Parallel) extends AbstractParams[Params] def main(args: Array[String]) { + + // han + println("Application starts:\n" + System.nanoTime()) + val defaultParams = Params() val parser = new OptionParser[Params]("DenseKMeans") { @@ -56,12 +80,22 @@ object DenseKMeans { .required() .text(s"number of clusters, required") .action((x, c) => c.copy(k = x)) + + // han + opt[String]("storageLevel") + .required() + .text(s"storage level of RDD caching (NONE, MEMORY_ONLY, MEMORY_AND_DISK, DISK_ONLY), required") + .action((x, c) => c.copy(storageLevel = x)) + opt[Int]("minNumPartitions") + .text(s"minimal number of partitions of the input file") + .action((x, c) => c.copy(minNumPartitions = x)) + opt[Int]("numIterations") .text(s"number of iterations, default: ${defaultParams.numIterations}") .action((x, c) => c.copy(numIterations = x)) opt[String]("initMode") .text(s"initialization mode (${InitializationMode.values.mkString(",")}), " + - s"default: ${defaultParams.initializationMode}") + s"default: ${defaultParams.initializationMode}") .action((x, c) => c.copy(initializationMode = InitializationMode.withName(x))) arg[String]("") .text("input paths to examples") @@ -74,17 +108,106 @@ object DenseKMeans { }.getOrElse { sys.exit(1) } + + // han + println("Application completes:\n" + System.nanoTime()) + } def run(params: Params) { val conf = new SparkConf().setAppName(s"DenseKMeans with $params") + + // mayuresh + //if (params.storageLevel=="MEMORY_ONLY_SER") { + // conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + // conf.set("spark.kryo.registrationRequired", "true") + // conf.registerKryoClasses(Array(classOf[Array[java.lang.Double]],classOf[org.apache.spark.mllib.linalg.DenseVector])) + //} + val sc = new SparkContext(conf) - Logger.getRootLogger.setLevel(Level.WARN) + // han sampler 1 begin + val SAMPLING_PERIOD: Long = 10 + val TIMESTAMP_PERIOD: Long = 1000 + + var dateFormat: DateFormat = new SimpleDateFormat("hh:mm:ss") + + val dirname_application = Properties.envOrElse("SPARK_HOME", "/home/mayuresh/spark-1.5.1") + "/logs/" + sc.applicationId + val dir_application = new File(dirname_application) + if (!dir_application.exists()) + dir_application.mkdirs() + + val ex = new ScheduledThreadPoolExecutor(1) + val task = new Runnable { + var i: Long = 0 + override def run { + + // sc.getExecutorStorageStatus.filter(s => s.blockManagerId.host.contains("slave1")) + sc.getExecutorStorageStatus.foreach { + es => + val filename: String = dirname_application + "/sparkOutput_driver_" + sc.applicationId + "_" + es.blockManagerId + ".txt" + val file = new File(filename) + val writer = new FileWriter(file, true) + if (!file.exists()) { + file.createNewFile() + writer.write(sc.applicationId + "_" + es.blockManagerId + "\n") + writer.flush() + writer.close() + } + var s = es.memUsed.toString() + //println(s) + if (i % TIMESTAMP_PERIOD == 0) { + i = 0 + var time: String = dateFormat.format(new Date()) + s += "\t" + time + } + + writer.write(s + "\n") + writer.flush() + writer.close() + } + i = i + SAMPLING_PERIOD + } + } + val f = ex.scheduleAtFixedRate(task, 0, SAMPLING_PERIOD, TimeUnit.MILLISECONDS) + // han sampler 1 end + + // han + // Logger.getRootLogger.setLevel(Level.WARN) + + // han + // val examples = sc.textFile(params.input).map { line => + // Vectors.dense(line.split(' ').map(_.toDouble)) + // }.cache() + + var storageLevel: StorageLevel = null + storageLevel = params.storageLevel match { + case "NONE" => StorageLevel.NONE + case "MEMORY_ONLY" => StorageLevel.MEMORY_ONLY + case "MEMORY_AND_DISK" => StorageLevel.MEMORY_AND_DISK + case "DISK_ONLY" => StorageLevel.DISK_ONLY + case "MEMORY_ONLY_SER" => StorageLevel.MEMORY_ONLY_SER + } - val examples = sc.textFile(params.input).map { line => + var examples: RDD[Vector] = null + /* + if (params.minNumPartitions < 0) + examples = sc.textFile(params.input).map { line => + // examples = sc.binaryFiles(params.input, 1).map { line => + Vectors.dense(line.split(' ').map(_.toDouble)) + }.persist(storageLevel) + else examples = sc.textFile(params.input, params.minNumPartitions).map { line => Vectors.dense(line.split(' ').map(_.toDouble)) - }.cache() + }.persist(storageLevel) + */ + + val data = sc.sequenceFile[LongWritable, VectorWritable](params.input) + examples = data.map { + case (k, v) => + var vector: Array[Double] = new Array[Double](v.get().size) + for (i <- 0 until v.get().size) vector(i) = v.get().get(i) + Vectors.dense(vector) + }.persist(storageLevel) val numExamples = examples.count() @@ -95,17 +218,36 @@ object DenseKMeans { case Parallel => KMeans.K_MEANS_PARALLEL } + // han + // val model = new KMeans() + // .setInitializationMode(initMode) + // .setK(params.k) + // .setMaxIterations(params.numIterations) + // .run(examples) val model = new KMeans() .setInitializationMode(initMode) .setK(params.k) .setMaxIterations(params.numIterations) + .setStorageLevel(storageLevel) .run(examples) val cost = model.computeCost(examples) println(s"Total cost = $cost.") + // han + // Save and load model + /* + val outputPath = "/kmeans-huge/output/" + sc.applicationId + model.save(sc, outputPath) + val sameModel = KMeansModel.load(sc, outputPath) + */ sc.stop() + + // han sampler 2 begin + f.cancel(true) + sys.exit(0) + // hand sampler 2 end } } // scalastyle:on println diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java index e11132c76753..5a85d9ac942a 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java @@ -108,6 +108,7 @@ public static long allocateMemory(long size) { // mayuresh: hacky way of tracking offheap usage TOTAL_BYTES += size; ADDRESS_SIZE_MAP.put(address, size); +System.out.println("-- Adding to offheap: " + Thread.currentThread().getStackTrace()); return address; } @@ -115,17 +116,33 @@ public static void freeMemory(long address) { // mayuresh: hacky way of tracking offheap usage TOTAL_BYTES -= ADDRESS_SIZE_MAP.get(address); ADDRESS_SIZE_MAP.remove(address); +System.out.println("-- Removing from offheap: " + Thread.currentThread().getStackTrace()); _UNSAFE.freeMemory(address); } public static void copyMemory( Object src, long srcOffset, Object dst, long dstOffset, long length) { - while (length > 0) { - long size = Math.min(length, UNSAFE_COPY_THRESHOLD); - _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); - length -= size; - srcOffset += size; - dstOffset += size; + // Check if dstOffset is before or after srcOffset to determine if we should copy + // forward or backwards. This is necessary in case src and dst overlap. + if (dstOffset < srcOffset) { + while (length > 0) { + long size = Math.min(length, UNSAFE_COPY_THRESHOLD); + _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); + length -= size; + srcOffset += size; + dstOffset += size; + } + } else { + srcOffset += length; + dstOffset += length; + while (length > 0) { + long size = Math.min(length, UNSAFE_COPY_THRESHOLD); + srcOffset -= size; + dstOffset -= size; + _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size); + length -= size; + } + } }