From 8131d284dd7a718dd4fbbf31d3cadf6a3195680a Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 9 Oct 2014 19:21:26 +0900 Subject: [PATCH 1/5] Added SparkSpaceBeforeLeftBraceChecker to check spaces before "{" --- .../SparkSpaceBeforeLeftBraceChecker.scala | 69 +++++++++++++++++++ scalastyle-config.xml | 1 + 2 files changed, 70 insertions(+) create mode 100644 project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceBeforeLeftBraceChecker.scala diff --git a/project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceBeforeLeftBraceChecker.scala b/project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceBeforeLeftBraceChecker.scala new file mode 100644 index 0000000000000..a08e576e9ba20 --- /dev/null +++ b/project/spark-style/src/main/scala/org/apache/spark/scalastyle/SparkSpaceBeforeLeftBraceChecker.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scalastyle + +import org.scalastyle.{PositionError, ScalariformChecker, ScalastyleError} +import scala.collection.mutable.{ListBuffer, Queue} +import scalariform.lexer.{Token, Tokens} +import scalariform.lexer.Tokens._ +import scalariform.parser.CompilationUnit + +class SparkSpaceBeforeLeftBraceChecker extends ScalariformChecker { + val errorKey: String = "insert.a.single.space.before.left.brace" + + val rememberQueue: Queue[Token] = Queue[Token]() + + // The list of disallowed tokens before left brace without single space. + val disallowedTokensBeforeLBrace = Seq ( + ARROW, ELSE, OP, RPAREN, TRY, MATCH, NEW, DO, FINALLY, PACKAGE, RETURN, THROW, YIELD, VARID + ) + + override def verify(ast: CompilationUnit): List[ScalastyleError] = { + + var list: ListBuffer[ScalastyleError] = new ListBuffer[ScalastyleError] + + for (token <- ast.tokens) { + rememberToken(token) + if (isLBrace(token) && + isTokenAfterSpecificTokens(token) && + !hasSingleWhiteSpaceBefore(token)) { + list += new PositionError(token.offset) + } + } + list.toList + } + + private def rememberToken(x: Token) = { + rememberQueue.enqueue(x) + if (rememberQueue.size > 2) { + rememberQueue.dequeue + } + x + } + + private def isTokenAfterSpecificTokens(x: Token) = { + val previousToken = rememberQueue.head + disallowedTokensBeforeLBrace.contains(previousToken.tokenType) + } + + private def isLBrace(x: Token) = + x.tokenType == Tokens.LBRACE + + private def hasSingleWhiteSpaceBefore(x: Token) = + x.associatedWhitespaceAndComments.whitespaces.size == 1 +} diff --git a/scalastyle-config.xml b/scalastyle-config.xml index c54f8b72ebf42..1edf8c434933e 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -142,4 +142,5 @@ + From 4014be060ddf09de2e974a716d3763050a8597bd Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sat, 11 Oct 2014 14:44:13 +0900 Subject: [PATCH 2/5] Fixed styles --- .../scala/org/apache/spark/SparkConf.scala | 6 +- .../apache/spark/api/java/JavaPairRDD.scala | 24 +- .../spark/api/python/PythonHadoopUtil.scala | 2 +- .../WriteInputFormatTestDataGenerator.scala | 29 +- .../spark/deploy/worker/DriverRunner.scala | 484 +++++++++--------- .../executor/ExecutorURLClassLoader.scala | 2 +- .../apache/spark/network/nio/Connection.scala | 2 +- .../org/apache/spark/rdd/CoalescedRDD.scala | 2 +- .../org/apache/spark/rdd/HadoopRDD.scala | 2 +- .../scala/org/apache/spark/rdd/JdbcRDD.scala | 2 +- .../main/scala/org/apache/spark/rdd/RDD.scala | 4 +- .../spark/scheduler/InputFormatInfo.scala | 2 +- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../shuffle/sort/SortShuffleManager.scala | 166 +++--- .../apache/spark/storage/BlockManagerId.scala | 2 +- .../apache/spark/ui/UIWorkloadGenerator.scala | 4 +- .../org/apache/spark/ui/jobs/StagePage.scala | 2 +- .../org/apache/spark/util/Distribution.scala | 6 +- .../spark/examples/DriverSubmissionTest.scala | 4 +- .../apache/spark/examples/LocalFileLR.scala | 2 +- .../apache/spark/examples/LocalKMeans.scala | 2 +- .../org/apache/spark/examples/LocalLR.scala | 4 +- .../org/apache/spark/examples/LogQuery.scala | 2 +- .../apache/spark/examples/SparkHdfsLR.scala | 194 +++---- .../apache/spark/examples/SparkKMeans.scala | 2 +- .../org/apache/spark/examples/SparkLR.scala | 4 +- .../apache/spark/examples/SparkPageRank.scala | 9 +- .../spark/examples/SparkTachyonHdfsLR.scala | 160 +++--- .../spark/examples/graphx/Analytics.scala | 328 ++++++------ .../spark/examples/mllib/MovieLensALS.scala | 2 +- .../examples/streaming/ActorWordCount.scala | 2 +- .../streaming/TwitterPopularTags.scala | 8 +- .../clickstream/PageViewStream.scala | 2 +- .../lib/StronglyConnectedComponents.scala | 2 +- .../spark/graphx/util/GraphGenerators.scala | 11 +- .../org/apache/spark/mllib/linalg/BLAS.scala | 22 +- .../apache/spark/mllib/linalg/Matrices.scala | 4 +- .../spark/mllib/recommendation/ALS.scala | 8 +- .../MatrixFactorizationModel.scala | 2 +- .../apache/spark/mllib/tree/model/Node.scala | 2 +- .../SparkSpaceBeforeLeftBraceChecker.scala | 2 +- .../spark/repl/SparkRunnerSettings.scala | 2 +- .../org/apache/spark/sql/hive/HiveQl.scala | 2 +- .../streaming/api/java/JavaPairDStream.scala | 36 +- .../streaming/receiver/ActorReceiver.scala | 2 +- .../spark/streaming/util/RawTextHelper.scala | 2 +- 46 files changed, 792 insertions(+), 773 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 605df0e929faa..05d1a1d380e68 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -187,8 +187,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { /** Get all executor environment variables set on this SparkConf */ def getExecutorEnv: Seq[(String, String)] = { val prefix = "spark.executorEnv." - getAll.filter{case (k, v) => k.startsWith(prefix)} - .map{case (k, v) => (k.substring(prefix.length), v)} + getAll.filter {case (k, v) => k.startsWith(prefix)} + .map {case (k, v) => (k.substring(prefix.length), v)} } /** Get all akka conf variables set on this SparkConf */ @@ -311,7 +311,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * configuration out for debugging. */ def toDebugString: String = { - settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n") + settings.toArray.sorted.map {case (k, v) => k + "=" + v}.mkString("\n") } } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 0846225e4f992..f23d9254a2b55 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -454,7 +454,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (V, Optional[W])] = { val joinResult = rdd.leftOuterJoin(other, partitioner) - fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) + fromRDD(joinResult.mapValues {case (v, w) => (v, JavaUtils.optionToOptional(w))}) } /** @@ -466,7 +466,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (Optional[V], W)] = { val joinResult = rdd.rightOuterJoin(other, partitioner) - fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) + fromRDD(joinResult.mapValues {case (v, w) => (JavaUtils.optionToOptional(v), w)}) } /** @@ -480,8 +480,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (Optional[V], Optional[W])] = { val joinResult = rdd.fullOuterJoin(other, partitioner) - fromRDD(joinResult.mapValues{ case (v, w) => - (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + fromRDD(joinResult.mapValues { + case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) }) } @@ -541,7 +541,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) */ def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = { val joinResult = rdd.leftOuterJoin(other) - fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) + fromRDD(joinResult.mapValues {case (v, w) => (v, JavaUtils.optionToOptional(w))}) } /** @@ -553,7 +553,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (V, Optional[W])] = { val joinResult = rdd.leftOuterJoin(other, numPartitions) - fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) + fromRDD(joinResult.mapValues {case (v, w) => (v, JavaUtils.optionToOptional(w))}) } /** @@ -564,7 +564,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) */ def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = { val joinResult = rdd.rightOuterJoin(other) - fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) + fromRDD(joinResult.mapValues {case (v, w) => (JavaUtils.optionToOptional(v), w)}) } /** @@ -576,7 +576,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (Optional[V], W)] = { val joinResult = rdd.rightOuterJoin(other, numPartitions) - fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) + fromRDD(joinResult.mapValues {case (v, w) => (JavaUtils.optionToOptional(v), w)}) } /** @@ -590,8 +590,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) */ def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = { val joinResult = rdd.fullOuterJoin(other) - fromRDD(joinResult.mapValues{ case (v, w) => - (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + fromRDD(joinResult.mapValues { + case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) }) } @@ -606,8 +606,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (Optional[V], Optional[W])] = { val joinResult = rdd.fullOuterJoin(other, numPartitions) - fromRDD(joinResult.mapValues{ case (v, w) => - (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + fromRDD(joinResult.mapValues { + case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) }) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index 49dc95f349eac..6b3ed9339fff3 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -160,7 +160,7 @@ private[python] object PythonHadoopUtil { def mapToConf(map: java.util.Map[String, String]): Configuration = { import collection.JavaConversions._ val conf = new Configuration() - map.foreach{ case (k, v) => conf.set(k, v) } + map.foreach { case (k, v) => conf.set(k, v) } conf } diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala index d11db978b842e..7e7ad7e7fa2fb 100644 --- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala +++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala @@ -134,14 +134,14 @@ object WriteInputFormatTestDataGenerator { */ val intKeys = Seq((1, "aa"), (2, "bb"), (2, "aa"), (3, "cc"), (2, "bb"), (1, "aa")) sc.parallelize(intKeys).saveAsSequenceFile(intPath) - sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath) - sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath) - sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) } + sc.parallelize(intKeys.map { case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath) + sc.parallelize(intKeys.map { case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath) + sc.parallelize(intKeys.map { case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) } ).saveAsSequenceFile(bytesPath) val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false)) sc.parallelize(bools).saveAsSequenceFile(boolPath) - sc.parallelize(intKeys).map{ case (k, v) => - (new IntWritable(k), NullWritable.get()) + sc.parallelize(intKeys).map { + case (k, v) => (new IntWritable(k), NullWritable.get()) }.saveAsSequenceFile(nullPath) // Create test data for ArrayWritable @@ -150,8 +150,8 @@ object WriteInputFormatTestDataGenerator { (2, Array(3.0, 4.0, 5.0)), (3, Array(4.0, 5.0, 6.0)) ) - sc.parallelize(data, numSlices = 2) - .map{ case (k, v) => + sc.parallelize(data, numSlices = 2).map { + case (k, v) => val va = new DoubleArrayWritable va.set(v.map(new DoubleWritable(_))) (new IntWritable(k), va) @@ -165,12 +165,13 @@ object WriteInputFormatTestDataGenerator { (2, Map(1.0 -> "aa")), (1, Map(3.0 -> "bb")) ) - sc.parallelize(mapData, numSlices = 2).map{ case (i, m) => - val mw = new MapWritable() - m.foreach { case (k, v) => - mw.put(new DoubleWritable(k), new Text(v)) - } - (new IntWritable(i), mw) + sc.parallelize(mapData, numSlices = 2).map { + case (i, m) => + val mw = new MapWritable() + m.foreach { case (k, v) => + mw.put(new DoubleWritable(k), new Text(v)) + } + (new IntWritable(i), mw) }.saveAsSequenceFile(mapPath) // Create test data for arbitrary custom writable TestWritable @@ -181,7 +182,7 @@ object WriteInputFormatTestDataGenerator { ("3", TestWritable("test56", 456, 423.5)), ("2", TestWritable("test2", 123, 5435.2)) ) - val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) } + val rdd = sc.parallelize(testClass, numSlices = 2).map { case (k, v) => (new Text(k), v) } rdd.saveAsNewAPIHadoopFile(classPath, classOf[Text], classOf[TestWritable], classOf[SequenceFileOutputFormat[Text, TestWritable]]) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 9f9911762505a..ad16aa57c799d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -1,242 +1,242 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.worker - -import java.io._ - -import scala.collection.JavaConversions._ -import scala.collection.Map - -import akka.actor.ActorRef -import com.google.common.base.Charsets -import com.google.common.io.Files -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileUtil, Path} - -import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil} -import org.apache.spark.deploy.DeployMessages.DriverStateChanged -import org.apache.spark.deploy.master.DriverState -import org.apache.spark.deploy.master.DriverState.DriverState - -/** - * Manages the execution of one driver, including automatically restarting the driver on failure. - * This is currently only used in standalone cluster deploy mode. - */ -private[spark] class DriverRunner( - val conf: SparkConf, - val driverId: String, - val workDir: File, - val sparkHome: File, - val driverDesc: DriverDescription, - val worker: ActorRef, - val workerUrl: String) - extends Logging { - - @volatile var process: Option[Process] = None - @volatile var killed = false - - // Populated once finished - var finalState: Option[DriverState] = None - var finalException: Option[Exception] = None - var finalExitCode: Option[Int] = None - - // Decoupled for testing - private[deploy] def setClock(_clock: Clock) = clock = _clock - private[deploy] def setSleeper(_sleeper: Sleeper) = sleeper = _sleeper - private var clock = new Clock { - def currentTimeMillis(): Long = System.currentTimeMillis() - } - private var sleeper = new Sleeper { - def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed}) - } - - /** Starts a thread to run and manage the driver. */ - def start() = { - new Thread("DriverRunner for " + driverId) { - override def run() { - try { - val driverDir = createWorkingDirectory() - val localJarFilename = downloadUserJar(driverDir) - - // Make sure user application jar is on the classpath - // TODO: If we add ability to submit multiple jars they should also be added here - val classPath = driverDesc.command.classPathEntries ++ Seq(s"$localJarFilename") - val newCommand = Command( - driverDesc.command.mainClass, - driverDesc.command.arguments.map(substituteVariables), - driverDesc.command.environment, - classPath, - driverDesc.command.libraryPathEntries, - driverDesc.command.javaOpts) - val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem, - sparkHome.getAbsolutePath) - launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise) - } - catch { - case e: Exception => finalException = Some(e) - } - - val state = - if (killed) { - DriverState.KILLED - } else if (finalException.isDefined) { - DriverState.ERROR - } else { - finalExitCode match { - case Some(0) => DriverState.FINISHED - case _ => DriverState.FAILED - } - } - - finalState = Some(state) - - worker ! DriverStateChanged(driverId, state, finalException) - } - }.start() - } - - /** Terminate this driver (or prevent it from ever starting if not yet started) */ - def kill() { - synchronized { - process.foreach(p => p.destroy()) - killed = true - } - } - - /** Replace variables in a command argument passed to us */ - private def substituteVariables(argument: String): String = argument match { - case "{{WORKER_URL}}" => workerUrl - case other => other - } - - /** - * Creates the working directory for this driver. - * Will throw an exception if there are errors preparing the directory. - */ - private def createWorkingDirectory(): File = { - val driverDir = new File(workDir, driverId) - if (!driverDir.exists() && !driverDir.mkdirs()) { - throw new IOException("Failed to create directory " + driverDir) - } - driverDir - } - - /** - * Download the user jar into the supplied directory and return its local path. - * Will throw an exception if there are errors downloading the jar. - */ - private def downloadUserJar(driverDir: File): String = { - - val jarPath = new Path(driverDesc.jarUrl) - - val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - val jarFileSystem = jarPath.getFileSystem(hadoopConf) - - val destPath = new File(driverDir.getAbsolutePath, jarPath.getName) - val jarFileName = jarPath.getName - val localJarFile = new File(driverDir, jarFileName) - val localJarFilename = localJarFile.getAbsolutePath - - if (!localJarFile.exists()) { // May already exist if running multiple workers on one node - logInfo(s"Copying user jar $jarPath to $destPath") - FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf) - } - - if (!localJarFile.exists()) { // Verify copy succeeded - throw new Exception(s"Did not see expected jar $jarFileName in $driverDir") - } - - localJarFilename - } - - private def launchDriver(command: Seq[String], envVars: Map[String, String], baseDir: File, - supervise: Boolean) { - val builder = new ProcessBuilder(command: _*).directory(baseDir) - envVars.map{ case(k,v) => builder.environment().put(k, v) } - - def initialize(process: Process) = { - // Redirect stdout and stderr to files - val stdout = new File(baseDir, "stdout") - CommandUtils.redirectStream(process.getInputStream, stdout) - - val stderr = new File(baseDir, "stderr") - val header = "Launch Command: %s\n%s\n\n".format( - command.mkString("\"", "\" \"", "\""), "=" * 40) - Files.append(header, stderr, Charsets.UTF_8) - CommandUtils.redirectStream(process.getErrorStream, stderr) - } - runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise) - } - - private[deploy] def runCommandWithRetry(command: ProcessBuilderLike, initialize: Process => Unit, - supervise: Boolean) { - // Time to wait between submission retries. - var waitSeconds = 1 - // A run of this many seconds resets the exponential back-off. - val successfulRunDuration = 5 - - var keepTrying = !killed - - while (keepTrying) { - logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\"")) - - synchronized { - if (killed) { return } - process = Some(command.start()) - initialize(process.get) - } - - val processStart = clock.currentTimeMillis() - val exitCode = process.get.waitFor() - if (clock.currentTimeMillis() - processStart > successfulRunDuration * 1000) { - waitSeconds = 1 - } - - if (supervise && exitCode != 0 && !killed) { - logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.") - sleeper.sleep(waitSeconds) - waitSeconds = waitSeconds * 2 // exponential back-off - } - - keepTrying = supervise && exitCode != 0 && !killed - finalExitCode = Some(exitCode) - } - } -} - -private[deploy] trait Clock { - def currentTimeMillis(): Long -} - -private[deploy] trait Sleeper { - def sleep(seconds: Int) -} - -// Needed because ProcessBuilder is a final class and cannot be mocked -private[deploy] trait ProcessBuilderLike { - def start(): Process - def command: Seq[String] -} - -private[deploy] object ProcessBuilderLike { - def apply(processBuilder: ProcessBuilder) = new ProcessBuilderLike { - def start() = processBuilder.start() - def command = processBuilder.command() - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.worker + +import java.io._ + +import scala.collection.JavaConversions._ +import scala.collection.Map + +import akka.actor.ActorRef +import com.google.common.base.Charsets +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileUtil, Path} + +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil} +import org.apache.spark.deploy.DeployMessages.DriverStateChanged +import org.apache.spark.deploy.master.DriverState +import org.apache.spark.deploy.master.DriverState.DriverState + +/** + * Manages the execution of one driver, including automatically restarting the driver on failure. + * This is currently only used in standalone cluster deploy mode. + */ +private[spark] class DriverRunner( + val conf: SparkConf, + val driverId: String, + val workDir: File, + val sparkHome: File, + val driverDesc: DriverDescription, + val worker: ActorRef, + val workerUrl: String) + extends Logging { + + @volatile var process: Option[Process] = None + @volatile var killed = false + + // Populated once finished + var finalState: Option[DriverState] = None + var finalException: Option[Exception] = None + var finalExitCode: Option[Int] = None + + // Decoupled for testing + private[deploy] def setClock(_clock: Clock) = clock = _clock + private[deploy] def setSleeper(_sleeper: Sleeper) = sleeper = _sleeper + private var clock = new Clock { + def currentTimeMillis(): Long = System.currentTimeMillis() + } + private var sleeper = new Sleeper { + def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed}) + } + + /** Starts a thread to run and manage the driver. */ + def start() = { + new Thread("DriverRunner for " + driverId) { + override def run() { + try { + val driverDir = createWorkingDirectory() + val localJarFilename = downloadUserJar(driverDir) + + // Make sure user application jar is on the classpath + // TODO: If we add ability to submit multiple jars they should also be added here + val classPath = driverDesc.command.classPathEntries ++ Seq(s"$localJarFilename") + val newCommand = Command( + driverDesc.command.mainClass, + driverDesc.command.arguments.map(substituteVariables), + driverDesc.command.environment, + classPath, + driverDesc.command.libraryPathEntries, + driverDesc.command.javaOpts) + val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem, + sparkHome.getAbsolutePath) + launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise) + } + catch { + case e: Exception => finalException = Some(e) + } + + val state = + if (killed) { + DriverState.KILLED + } else if (finalException.isDefined) { + DriverState.ERROR + } else { + finalExitCode match { + case Some(0) => DriverState.FINISHED + case _ => DriverState.FAILED + } + } + + finalState = Some(state) + + worker ! DriverStateChanged(driverId, state, finalException) + } + }.start() + } + + /** Terminate this driver (or prevent it from ever starting if not yet started) */ + def kill() { + synchronized { + process.foreach(p => p.destroy()) + killed = true + } + } + + /** Replace variables in a command argument passed to us */ + private def substituteVariables(argument: String): String = argument match { + case "{{WORKER_URL}}" => workerUrl + case other => other + } + + /** + * Creates the working directory for this driver. + * Will throw an exception if there are errors preparing the directory. + */ + private def createWorkingDirectory(): File = { + val driverDir = new File(workDir, driverId) + if (!driverDir.exists() && !driverDir.mkdirs()) { + throw new IOException("Failed to create directory " + driverDir) + } + driverDir + } + + /** + * Download the user jar into the supplied directory and return its local path. + * Will throw an exception if there are errors downloading the jar. + */ + private def downloadUserJar(driverDir: File): String = { + + val jarPath = new Path(driverDesc.jarUrl) + + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val jarFileSystem = jarPath.getFileSystem(hadoopConf) + + val destPath = new File(driverDir.getAbsolutePath, jarPath.getName) + val jarFileName = jarPath.getName + val localJarFile = new File(driverDir, jarFileName) + val localJarFilename = localJarFile.getAbsolutePath + + if (!localJarFile.exists()) { // May already exist if running multiple workers on one node + logInfo(s"Copying user jar $jarPath to $destPath") + FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf) + } + + if (!localJarFile.exists()) { // Verify copy succeeded + throw new Exception(s"Did not see expected jar $jarFileName in $driverDir") + } + + localJarFilename + } + + private def launchDriver(command: Seq[String], envVars: Map[String, String], baseDir: File, + supervise: Boolean) { + val builder = new ProcessBuilder(command: _*).directory(baseDir) + envVars.map { case(k,v) => builder.environment().put(k, v) } + + def initialize(process: Process) = { + // Redirect stdout and stderr to files + val stdout = new File(baseDir, "stdout") + CommandUtils.redirectStream(process.getInputStream, stdout) + + val stderr = new File(baseDir, "stderr") + val header = "Launch Command: %s\n%s\n\n".format( + command.mkString("\"", "\" \"", "\""), "=" * 40) + Files.append(header, stderr, Charsets.UTF_8) + CommandUtils.redirectStream(process.getErrorStream, stderr) + } + runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise) + } + + private[deploy] def runCommandWithRetry(command: ProcessBuilderLike, initialize: Process => Unit, + supervise: Boolean) { + // Time to wait between submission retries. + var waitSeconds = 1 + // A run of this many seconds resets the exponential back-off. + val successfulRunDuration = 5 + + var keepTrying = !killed + + while (keepTrying) { + logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\"")) + + synchronized { + if (killed) { return } + process = Some(command.start()) + initialize(process.get) + } + + val processStart = clock.currentTimeMillis() + val exitCode = process.get.waitFor() + if (clock.currentTimeMillis() - processStart > successfulRunDuration * 1000) { + waitSeconds = 1 + } + + if (supervise && exitCode != 0 && !killed) { + logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.") + sleeper.sleep(waitSeconds) + waitSeconds = waitSeconds * 2 // exponential back-off + } + + keepTrying = supervise && exitCode != 0 && !killed + finalExitCode = Some(exitCode) + } + } +} + +private[deploy] trait Clock { + def currentTimeMillis(): Long +} + +private[deploy] trait Sleeper { + def sleep(seconds: Int) +} + +// Needed because ProcessBuilder is a final class and cannot be mocked +private[deploy] trait ProcessBuilderLike { + def start(): Process + def command: Seq[String] +} + +private[deploy] object ProcessBuilderLike { + def apply(processBuilder: ProcessBuilder) = new ProcessBuilderLike { + def start() = processBuilder.start() + def command = processBuilder.command() + } +} diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala index 218ed7b5d2d39..425e220888ac3 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala @@ -34,7 +34,7 @@ private[spark] trait MutableURLClassLoader extends ClassLoader { private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader) extends MutableURLClassLoader { - private object userClassLoader extends URLClassLoader(urls, null){ + private object userClassLoader extends URLClassLoader(urls, null) { override def addURL(url: URL) { super.addURL(url) } diff --git a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala index 4f6f5e235811d..623cc0ab99dca 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala @@ -323,7 +323,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, // MUST be called within the selector loop def connect() { - try{ + try { channel.register(selector, SelectionKey.OP_CONNECT) channel.connect(address) logInfo("Initiating connection to [" + address + "]") diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 11ebafbf6d457..cbda76e8c1b50 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -316,7 +316,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc for(i <- 0 until maxPartitions) { val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt - (rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) } + (rangeStart until rangeEnd).foreach(j => groupArr(i).arr += prev.partitions(j)) } } } else { diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 6b63eb23e9ee1..f20fbcbfdeb8c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -200,7 +200,7 @@ class HadoopRDD[K, V]( reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. - context.addTaskCompletionListener{ context => closeIfNeeded() } + context.addTaskCompletionListener(context => closeIfNeeded()) val key: K = reader.createKey() val value: V = reader.createValue() diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index 0e38f224ac81d..d5a3164faac6e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -68,7 +68,7 @@ class JdbcRDD[T: ClassTag]( } override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { - context.addTaskCompletionListener{ context => closeIfNeeded() } + context.addTaskCompletionListener(context => closeIfNeeded()) val part = thePart.asInstanceOf[JdbcPartition] val conn = getConnection() val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 2aba40d152e3e..4e9e8033723c2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1357,7 +1357,7 @@ abstract class RDD[T: ClassTag]( val leftOffset = (partitionStr.length - 1) / 2 val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)) - debugSelf(rdd).zipWithIndex.map{ + debugSelf(rdd).zipWithIndex.map { case (desc: String, 0) => s"$partitionStr $desc" case (desc: String, _) => s"$nextPrefix $desc" } ++ debugChildren(rdd, nextPrefix) @@ -1371,7 +1371,7 @@ abstract class RDD[T: ClassTag]( + (if (isLastChild) " " else "| ") + (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))) - debugSelf(rdd).zipWithIndex.map{ + debugSelf(rdd).zipWithIndex.map { case (desc: String, 0) => s"$thisPrefix+-$partitionStr $desc" case (desc: String, _) => s"$nextPrefix$desc" } ++ debugChildren(rdd, nextPrefix) diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index bac37bfdaa23f..94e2e3bd47f93 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -173,7 +173,7 @@ object InputFormatInfo { for (inputSplit <- formats) { val splits = inputSplit.findPreferredLocations() - for (split <- splits){ + for (split <- splits) { val location = split.hostLocation val set = nodeToSplit.getOrElseUpdate(location, new HashSet[SplitInfo]) set += split diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6d697e3d003f6..5c0b7b5b963c0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -526,7 +526,7 @@ private[spark] object TaskSchedulerImpl { val containerList: ArrayBuffer[T] = map.get(key).getOrElse(null) assert(containerList != null) // Get the index'th entry for this host - if present - if (index < containerList.size){ + if (index < containerList.size) { retval += containerList.apply(index) found = true } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index b727438ae7e47..637ffab3c0f77 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -1,83 +1,83 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.sort - -import java.util.concurrent.ConcurrentHashMap - -import org.apache.spark.{SparkConf, TaskContext, ShuffleDependency} -import org.apache.spark.shuffle._ -import org.apache.spark.shuffle.hash.HashShuffleReader - -private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager { - - private val indexShuffleBlockManager = new IndexShuffleBlockManager() - private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]() - - /** - * Register a shuffle with the manager and obtain a handle for it to pass to tasks. - */ - override def registerShuffle[K, V, C]( - shuffleId: Int, - numMaps: Int, - dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { - new BaseShuffleHandle(shuffleId, numMaps, dependency) - } - - /** - * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). - * Called on executors by reduce tasks. - */ - override def getReader[K, C]( - handle: ShuffleHandle, - startPartition: Int, - endPartition: Int, - context: TaskContext): ShuffleReader[K, C] = { - // We currently use the same block store shuffle fetcher as the hash-based shuffle. - new HashShuffleReader( - handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) - } - - /** Get a writer for a given partition. Called on executors by map tasks. */ - override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) - : ShuffleWriter[K, V] = { - val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]] - shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps) - new SortShuffleWriter( - shuffleBlockManager, baseShuffleHandle, mapId, context) - } - - /** Remove a shuffle's metadata from the ShuffleManager. */ - override def unregisterShuffle(shuffleId: Int): Boolean = { - if (shuffleMapNumber.containsKey(shuffleId)) { - val numMaps = shuffleMapNumber.remove(shuffleId) - (0 until numMaps).map{ mapId => - shuffleBlockManager.removeDataByMap(shuffleId, mapId) - } - } - true - } - - override def shuffleBlockManager: IndexShuffleBlockManager = { - indexShuffleBlockManager - } - - /** Shut down this ShuffleManager. */ - override def stop(): Unit = { - shuffleBlockManager.stop() - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.util.concurrent.ConcurrentHashMap + +import org.apache.spark.{SparkConf, TaskContext, ShuffleDependency} +import org.apache.spark.shuffle._ +import org.apache.spark.shuffle.hash.HashShuffleReader + +private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager { + + private val indexShuffleBlockManager = new IndexShuffleBlockManager() + private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]() + + /** + * Register a shuffle with the manager and obtain a handle for it to pass to tasks. + */ + override def registerShuffle[K, V, C]( + shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { + new BaseShuffleHandle(shuffleId, numMaps, dependency) + } + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext): ShuffleReader[K, C] = { + // We currently use the same block store shuffle fetcher as the hash-based shuffle. + new HashShuffleReader( + handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) + } + + /** Get a writer for a given partition. Called on executors by map tasks. */ + override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) + : ShuffleWriter[K, V] = { + val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]] + shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps) + new SortShuffleWriter( + shuffleBlockManager, baseShuffleHandle, mapId, context) + } + + /** Remove a shuffle's metadata from the ShuffleManager. */ + override def unregisterShuffle(shuffleId: Int): Boolean = { + if (shuffleMapNumber.containsKey(shuffleId)) { + val numMaps = shuffleMapNumber.remove(shuffleId) + (0 until numMaps).map { mapId => + shuffleBlockManager.removeDataByMap(shuffleId, mapId) + } + } + true + } + + override def shuffleBlockManager: IndexShuffleBlockManager = { + indexShuffleBlockManager + } + + /** Shut down this ShuffleManager. */ + override def stop(): Unit = { + shuffleBlockManager.stop() + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 142285094342c..506f73c4e4ebf 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -43,7 +43,7 @@ class BlockManagerId private ( def executorId: String = executorId_ - if (null != host_){ + if (null != host_) { Utils.checkHost(host_, "Expected hostname") assert (port_ > 0) } diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 18d2b5075aa08..09eb0304ef2de 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -64,7 +64,7 @@ private[spark] object UIWorkloadGenerator { ("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count), ("Entirely failed phase", baseData.map(x => throw new Exception).count), ("Partially failed phase", { - baseData.map{x => + baseData.map {x => val probFailure = (4.0 / NUM_PARTITIONS) if (nextFloat() < probFailure) { throw new Exception("This is a task failure") @@ -73,7 +73,7 @@ private[spark] object UIWorkloadGenerator { }.count }), ("Partially failed phase (longer tasks)", { - baseData.map{x => + baseData.map {x => val probFailure = (4.0 / NUM_PARTITIONS) if (nextFloat() < probFailure) { Thread.sleep(100) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 2414e4c65237e..a2599ade43d38 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -292,7 +292,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {Unparsed( - info.accumulables.map{acc => s"${acc.name}: ${acc.update.get}"}.mkString("
") + info.accumulables.map(acc => s"${acc.name}: ${acc.update.get}").mkString("
") )}