diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 605df0e929faa..05d1a1d380e68 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -187,8 +187,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { /** Get all executor environment variables set on this SparkConf */ def getExecutorEnv: Seq[(String, String)] = { val prefix = "spark.executorEnv." - getAll.filter{case (k, v) => k.startsWith(prefix)} - .map{case (k, v) => (k.substring(prefix.length), v)} + getAll.filter {case (k, v) => k.startsWith(prefix)} + .map {case (k, v) => (k.substring(prefix.length), v)} } /** Get all akka conf variables set on this SparkConf */ @@ -311,7 +311,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * configuration out for debugging. */ def toDebugString: String = { - settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n") + settings.toArray.sorted.map {case (k, v) => k + "=" + v}.mkString("\n") } } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 0846225e4f992..f23d9254a2b55 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -454,7 +454,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (V, Optional[W])] = { val joinResult = rdd.leftOuterJoin(other, partitioner) - fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) + fromRDD(joinResult.mapValues {case (v, w) => (v, JavaUtils.optionToOptional(w))}) } /** @@ -466,7 +466,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (Optional[V], W)] = { val joinResult = rdd.rightOuterJoin(other, partitioner) - fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) + fromRDD(joinResult.mapValues {case (v, w) => (JavaUtils.optionToOptional(v), w)}) } /** @@ -480,8 +480,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (Optional[V], Optional[W])] = { val joinResult = rdd.fullOuterJoin(other, partitioner) - fromRDD(joinResult.mapValues{ case (v, w) => - (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + fromRDD(joinResult.mapValues { + case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) }) } @@ -541,7 +541,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) */ def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = { val joinResult = rdd.leftOuterJoin(other) - fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) + fromRDD(joinResult.mapValues {case (v, w) => (v, JavaUtils.optionToOptional(w))}) } /** @@ -553,7 +553,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (V, Optional[W])] = { val joinResult = rdd.leftOuterJoin(other, numPartitions) - fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) + fromRDD(joinResult.mapValues {case (v, w) => (v, JavaUtils.optionToOptional(w))}) } /** @@ -564,7 +564,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) */ def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = { val joinResult = rdd.rightOuterJoin(other) - fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) + fromRDD(joinResult.mapValues {case (v, w) => (JavaUtils.optionToOptional(v), w)}) } /** @@ -576,7 +576,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (Optional[V], W)] = { val joinResult = rdd.rightOuterJoin(other, numPartitions) - fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) + fromRDD(joinResult.mapValues {case (v, w) => (JavaUtils.optionToOptional(v), w)}) } /** @@ -590,8 +590,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) */ def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = { val joinResult = rdd.fullOuterJoin(other) - fromRDD(joinResult.mapValues{ case (v, w) => - (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + fromRDD(joinResult.mapValues { + case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) }) } @@ -606,8 +606,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (Optional[V], Optional[W])] = { val joinResult = rdd.fullOuterJoin(other, numPartitions) - fromRDD(joinResult.mapValues{ case (v, w) => - (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) + fromRDD(joinResult.mapValues { + case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) }) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index 49dc95f349eac..6b3ed9339fff3 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -160,7 +160,7 @@ private[python] object PythonHadoopUtil { def mapToConf(map: java.util.Map[String, String]): Configuration = { import collection.JavaConversions._ val conf = new Configuration() - map.foreach{ case (k, v) => conf.set(k, v) } + map.foreach { case (k, v) => conf.set(k, v) } conf } diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala index d11db978b842e..7e7ad7e7fa2fb 100644 --- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala +++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala @@ -134,14 +134,14 @@ object WriteInputFormatTestDataGenerator { */ val intKeys = Seq((1, "aa"), (2, "bb"), (2, "aa"), (3, "cc"), (2, "bb"), (1, "aa")) sc.parallelize(intKeys).saveAsSequenceFile(intPath) - sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath) - sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath) - sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) } + sc.parallelize(intKeys.map { case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath) + sc.parallelize(intKeys.map { case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath) + sc.parallelize(intKeys.map { case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) } ).saveAsSequenceFile(bytesPath) val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false)) sc.parallelize(bools).saveAsSequenceFile(boolPath) - sc.parallelize(intKeys).map{ case (k, v) => - (new IntWritable(k), NullWritable.get()) + sc.parallelize(intKeys).map { + case (k, v) => (new IntWritable(k), NullWritable.get()) }.saveAsSequenceFile(nullPath) // Create test data for ArrayWritable @@ -150,8 +150,8 @@ object WriteInputFormatTestDataGenerator { (2, Array(3.0, 4.0, 5.0)), (3, Array(4.0, 5.0, 6.0)) ) - sc.parallelize(data, numSlices = 2) - .map{ case (k, v) => + sc.parallelize(data, numSlices = 2).map { + case (k, v) => val va = new DoubleArrayWritable va.set(v.map(new DoubleWritable(_))) (new IntWritable(k), va) @@ -165,12 +165,13 @@ object WriteInputFormatTestDataGenerator { (2, Map(1.0 -> "aa")), (1, Map(3.0 -> "bb")) ) - sc.parallelize(mapData, numSlices = 2).map{ case (i, m) => - val mw = new MapWritable() - m.foreach { case (k, v) => - mw.put(new DoubleWritable(k), new Text(v)) - } - (new IntWritable(i), mw) + sc.parallelize(mapData, numSlices = 2).map { + case (i, m) => + val mw = new MapWritable() + m.foreach { case (k, v) => + mw.put(new DoubleWritable(k), new Text(v)) + } + (new IntWritable(i), mw) }.saveAsSequenceFile(mapPath) // Create test data for arbitrary custom writable TestWritable @@ -181,7 +182,7 @@ object WriteInputFormatTestDataGenerator { ("3", TestWritable("test56", 456, 423.5)), ("2", TestWritable("test2", 123, 5435.2)) ) - val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) } + val rdd = sc.parallelize(testClass, numSlices = 2).map { case (k, v) => (new Text(k), v) } rdd.saveAsNewAPIHadoopFile(classPath, classOf[Text], classOf[TestWritable], classOf[SequenceFileOutputFormat[Text, TestWritable]]) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 9f9911762505a..8184f6a6faa82 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -168,7 +168,7 @@ private[spark] class DriverRunner( private def launchDriver(command: Seq[String], envVars: Map[String, String], baseDir: File, supervise: Boolean) { val builder = new ProcessBuilder(command: _*).directory(baseDir) - envVars.map{ case(k,v) => builder.environment().put(k, v) } + envVars.map { case(k,v) => builder.environment().put(k, v) } def initialize(process: Process) = { // Redirect stdout and stderr to files diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala index 218ed7b5d2d39..425e220888ac3 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala @@ -34,7 +34,7 @@ private[spark] trait MutableURLClassLoader extends ClassLoader { private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader) extends MutableURLClassLoader { - private object userClassLoader extends URLClassLoader(urls, null){ + private object userClassLoader extends URLClassLoader(urls, null) { override def addURL(url: URL) { super.addURL(url) } diff --git a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala index 4f6f5e235811d..623cc0ab99dca 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala @@ -323,7 +323,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, // MUST be called within the selector loop def connect() { - try{ + try { channel.register(selector, SelectionKey.OP_CONNECT) channel.connect(address) logInfo("Initiating connection to [" + address + "]") diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 11ebafbf6d457..cbda76e8c1b50 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -316,7 +316,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc for(i <- 0 until maxPartitions) { val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt - (rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) } + (rangeStart until rangeEnd).foreach(j => groupArr(i).arr += prev.partitions(j)) } } } else { diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 6b63eb23e9ee1..f20fbcbfdeb8c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -200,7 +200,7 @@ class HadoopRDD[K, V]( reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. - context.addTaskCompletionListener{ context => closeIfNeeded() } + context.addTaskCompletionListener(context => closeIfNeeded()) val key: K = reader.createKey() val value: V = reader.createValue() diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index 0e38f224ac81d..d5a3164faac6e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -68,7 +68,7 @@ class JdbcRDD[T: ClassTag]( } override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { - context.addTaskCompletionListener{ context => closeIfNeeded() } + context.addTaskCompletionListener(context => closeIfNeeded()) val part = thePart.asInstanceOf[JdbcPartition] val conn = getConnection() val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 2aba40d152e3e..4e9e8033723c2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1357,7 +1357,7 @@ abstract class RDD[T: ClassTag]( val leftOffset = (partitionStr.length - 1) / 2 val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)) - debugSelf(rdd).zipWithIndex.map{ + debugSelf(rdd).zipWithIndex.map { case (desc: String, 0) => s"$partitionStr $desc" case (desc: String, _) => s"$nextPrefix $desc" } ++ debugChildren(rdd, nextPrefix) @@ -1371,7 +1371,7 @@ abstract class RDD[T: ClassTag]( + (if (isLastChild) " " else "| ") + (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))) - debugSelf(rdd).zipWithIndex.map{ + debugSelf(rdd).zipWithIndex.map { case (desc: String, 0) => s"$thisPrefix+-$partitionStr $desc" case (desc: String, _) => s"$nextPrefix$desc" } ++ debugChildren(rdd, nextPrefix) diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index bac37bfdaa23f..94e2e3bd47f93 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -173,7 +173,7 @@ object InputFormatInfo { for (inputSplit <- formats) { val splits = inputSplit.findPreferredLocations() - for (split <- splits){ + for (split <- splits) { val location = split.hostLocation val set = nodeToSplit.getOrElseUpdate(location, new HashSet[SplitInfo]) set += split diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6d697e3d003f6..5c0b7b5b963c0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -526,7 +526,7 @@ private[spark] object TaskSchedulerImpl { val containerList: ArrayBuffer[T] = map.get(key).getOrElse(null) assert(containerList != null) // Get the index'th entry for this host - if present - if (index < containerList.size){ + if (index < containerList.size) { retval += containerList.apply(index) found = true } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index b727438ae7e47..e6622a49850ca 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -65,7 +65,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager override def unregisterShuffle(shuffleId: Int): Boolean = { if (shuffleMapNumber.containsKey(shuffleId)) { val numMaps = shuffleMapNumber.remove(shuffleId) - (0 until numMaps).map{ mapId => + (0 until numMaps).map { mapId => shuffleBlockManager.removeDataByMap(shuffleId, mapId) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 142285094342c..506f73c4e4ebf 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -43,7 +43,7 @@ class BlockManagerId private ( def executorId: String = executorId_ - if (null != host_){ + if (null != host_) { Utils.checkHost(host_, "Expected hostname") assert (port_ > 0) } diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 18d2b5075aa08..09eb0304ef2de 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -64,7 +64,7 @@ private[spark] object UIWorkloadGenerator { ("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count), ("Entirely failed phase", baseData.map(x => throw new Exception).count), ("Partially failed phase", { - baseData.map{x => + baseData.map {x => val probFailure = (4.0 / NUM_PARTITIONS) if (nextFloat() < probFailure) { throw new Exception("This is a task failure") @@ -73,7 +73,7 @@ private[spark] object UIWorkloadGenerator { }.count }), ("Partially failed phase (longer tasks)", { - baseData.map{x => + baseData.map {x => val probFailure = (4.0 / NUM_PARTITIONS) if (nextFloat() < probFailure) { Thread.sleep(100) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 2414e4c65237e..a2599ade43d38 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -292,7 +292,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {