Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Get all executor environment variables set on this SparkConf */
def getExecutorEnv: Seq[(String, String)] = {
val prefix = "spark.executorEnv."
getAll.filter{case (k, v) => k.startsWith(prefix)}
.map{case (k, v) => (k.substring(prefix.length), v)}
getAll.filter {case (k, v) => k.startsWith(prefix)}
.map {case (k, v) => (k.substring(prefix.length), v)}
}

/** Get all akka conf variables set on this SparkConf */
Expand Down Expand Up @@ -311,7 +311,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
* configuration out for debugging.
*/
def toDebugString: String = {
settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
settings.toArray.sorted.map {case (k, v) => k + "=" + v}.mkString("\n")
}
}

Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (V, Optional[W])] = {
val joinResult = rdd.leftOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
fromRDD(joinResult.mapValues {case (v, w) => (v, JavaUtils.optionToOptional(w))})
}

/**
Expand All @@ -466,7 +466,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (Optional[V], W)] = {
val joinResult = rdd.rightOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
fromRDD(joinResult.mapValues {case (v, w) => (JavaUtils.optionToOptional(v), w)})
}

/**
Expand All @@ -480,8 +480,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
fromRDD(joinResult.mapValues {
case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
})
}

Expand Down Expand Up @@ -541,7 +541,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = {
val joinResult = rdd.leftOuterJoin(other)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
fromRDD(joinResult.mapValues {case (v, w) => (v, JavaUtils.optionToOptional(w))})
}

/**
Expand All @@ -553,7 +553,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
: JavaPairRDD[K, (V, Optional[W])] = {
val joinResult = rdd.leftOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
fromRDD(joinResult.mapValues {case (v, w) => (v, JavaUtils.optionToOptional(w))})
}

/**
Expand All @@ -564,7 +564,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = {
val joinResult = rdd.rightOuterJoin(other)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
fromRDD(joinResult.mapValues {case (v, w) => (JavaUtils.optionToOptional(v), w)})
}

/**
Expand All @@ -576,7 +576,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
: JavaPairRDD[K, (Optional[V], W)] = {
val joinResult = rdd.rightOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
fromRDD(joinResult.mapValues {case (v, w) => (JavaUtils.optionToOptional(v), w)})
}

/**
Expand All @@ -590,8 +590,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other)
fromRDD(joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
fromRDD(joinResult.mapValues {
case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
})
}

Expand All @@ -606,8 +606,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
fromRDD(joinResult.mapValues {
case (v, w) => (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private[python] object PythonHadoopUtil {
def mapToConf(map: java.util.Map[String, String]): Configuration = {
import collection.JavaConversions._
val conf = new Configuration()
map.foreach{ case (k, v) => conf.set(k, v) }
map.foreach { case (k, v) => conf.set(k, v) }
conf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ object WriteInputFormatTestDataGenerator {
*/
val intKeys = Seq((1, "aa"), (2, "bb"), (2, "aa"), (3, "cc"), (2, "bb"), (1, "aa"))
sc.parallelize(intKeys).saveAsSequenceFile(intPath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) }
sc.parallelize(intKeys.map { case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
sc.parallelize(intKeys.map { case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
sc.parallelize(intKeys.map { case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) }
).saveAsSequenceFile(bytesPath)
val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
sc.parallelize(bools).saveAsSequenceFile(boolPath)
sc.parallelize(intKeys).map{ case (k, v) =>
(new IntWritable(k), NullWritable.get())
sc.parallelize(intKeys).map {
case (k, v) => (new IntWritable(k), NullWritable.get())
}.saveAsSequenceFile(nullPath)

// Create test data for ArrayWritable
Expand All @@ -150,8 +150,8 @@ object WriteInputFormatTestDataGenerator {
(2, Array(3.0, 4.0, 5.0)),
(3, Array(4.0, 5.0, 6.0))
)
sc.parallelize(data, numSlices = 2)
.map{ case (k, v) =>
sc.parallelize(data, numSlices = 2).map {
case (k, v) =>
val va = new DoubleArrayWritable
va.set(v.map(new DoubleWritable(_)))
(new IntWritable(k), va)
Expand All @@ -165,12 +165,13 @@ object WriteInputFormatTestDataGenerator {
(2, Map(1.0 -> "aa")),
(1, Map(3.0 -> "bb"))
)
sc.parallelize(mapData, numSlices = 2).map{ case (i, m) =>
val mw = new MapWritable()
m.foreach { case (k, v) =>
mw.put(new DoubleWritable(k), new Text(v))
}
(new IntWritable(i), mw)
sc.parallelize(mapData, numSlices = 2).map {
case (i, m) =>
val mw = new MapWritable()
m.foreach { case (k, v) =>
mw.put(new DoubleWritable(k), new Text(v))
}
(new IntWritable(i), mw)
}.saveAsSequenceFile(mapPath)

// Create test data for arbitrary custom writable TestWritable
Expand All @@ -181,7 +182,7 @@ object WriteInputFormatTestDataGenerator {
("3", TestWritable("test56", 456, 423.5)),
("2", TestWritable("test2", 123, 5435.2))
)
val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
val rdd = sc.parallelize(testClass, numSlices = 2).map { case (k, v) => (new Text(k), v) }
rdd.saveAsNewAPIHadoopFile(classPath,
classOf[Text], classOf[TestWritable],
classOf[SequenceFileOutputFormat[Text, TestWritable]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private[spark] class DriverRunner(
private def launchDriver(command: Seq[String], envVars: Map[String, String], baseDir: File,
supervise: Boolean) {
val builder = new ProcessBuilder(command: _*).directory(baseDir)
envVars.map{ case(k,v) => builder.environment().put(k, v) }
envVars.map { case(k,v) => builder.environment().put(k, v) }

def initialize(process: Process) = {
// Redirect stdout and stderr to files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[spark] trait MutableURLClassLoader extends ClassLoader {
private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
extends MutableURLClassLoader {

private object userClassLoader extends URLClassLoader(urls, null){
private object userClassLoader extends URLClassLoader(urls, null) {
override def addURL(url: URL) {
super.addURL(url)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,

// MUST be called within the selector loop
def connect() {
try{
try {
channel.register(selector, SelectionKey.OP_CONNECT)
channel.connect(address)
logInfo("Initiating connection to [" + address + "]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
for(i <- 0 until maxPartitions) {
val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
(rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) }
(rangeStart until rangeEnd).foreach(j => groupArr(i).arr += prev.partitions(j))
}
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class HadoopRDD[K, V](
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
context.addTaskCompletionListener(context => closeIfNeeded())
val key: K = reader.createKey()
val value: V = reader.createValue()

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class JdbcRDD[T: ClassTag](
}

override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
context.addTaskCompletionListener{ context => closeIfNeeded() }
context.addTaskCompletionListener(context => closeIfNeeded())
val part = thePart.asInstanceOf[JdbcPartition]
val conn = getConnection()
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,7 @@ abstract class RDD[T: ClassTag](
val leftOffset = (partitionStr.length - 1) / 2
val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))

debugSelf(rdd).zipWithIndex.map{
debugSelf(rdd).zipWithIndex.map {
case (desc: String, 0) => s"$partitionStr $desc"
case (desc: String, _) => s"$nextPrefix $desc"
} ++ debugChildren(rdd, nextPrefix)
Expand All @@ -1371,7 +1371,7 @@ abstract class RDD[T: ClassTag](
+ (if (isLastChild) " " else "| ")
+ (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)))

debugSelf(rdd).zipWithIndex.map{
debugSelf(rdd).zipWithIndex.map {
case (desc: String, 0) => s"$thisPrefix+-$partitionStr $desc"
case (desc: String, _) => s"$nextPrefix$desc"
} ++ debugChildren(rdd, nextPrefix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ object InputFormatInfo {
for (inputSplit <- formats) {
val splits = inputSplit.findPreferredLocations()

for (split <- splits){
for (split <- splits) {
val location = split.hostLocation
val set = nodeToSplit.getOrElseUpdate(location, new HashSet[SplitInfo])
set += split
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ private[spark] object TaskSchedulerImpl {
val containerList: ArrayBuffer[T] = map.get(key).getOrElse(null)
assert(containerList != null)
// Get the index'th entry for this host - if present
if (index < containerList.size){
if (index < containerList.size) {
retval += containerList.apply(index)
found = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
override def unregisterShuffle(shuffleId: Int): Boolean = {
if (shuffleMapNumber.containsKey(shuffleId)) {
val numMaps = shuffleMapNumber.remove(shuffleId)
(0 until numMaps).map{ mapId =>
(0 until numMaps).map { mapId =>
shuffleBlockManager.removeDataByMap(shuffleId, mapId)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class BlockManagerId private (

def executorId: String = executorId_

if (null != host_){
if (null != host_) {
Utils.checkHost(host_, "Expected hostname")
assert (port_ > 0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[spark] object UIWorkloadGenerator {
("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count),
("Entirely failed phase", baseData.map(x => throw new Exception).count),
("Partially failed phase", {
baseData.map{x =>
baseData.map {x =>
val probFailure = (4.0 / NUM_PARTITIONS)
if (nextFloat() < probFailure) {
throw new Exception("This is a task failure")
Expand All @@ -73,7 +73,7 @@ private[spark] object UIWorkloadGenerator {
}.count
}),
("Partially failed phase (longer tasks)", {
baseData.map{x =>
baseData.map {x =>
val probFailure = (4.0 / NUM_PARTITIONS)
if (nextFloat() < probFailure) {
Thread.sleep(100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
</td>
<td>
{Unparsed(
info.accumulables.map{acc => s"${acc.name}: ${acc.update.get}"}.mkString("<br/>")
info.accumulables.map(acc => s"${acc.name}: ${acc.update.get}").mkString("<br/>")
)}
</td>
<!--
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Distribution.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va
*/
def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities)
: IndexedSeq[Double] = {
probabilities.toIndexedSeq.map{p:Double => data(closestIndex(p))}
probabilities.toIndexedSeq.map(q => data(closestIndex(q)))
}

private def closestIndex(p: Double) = {
Expand All @@ -53,7 +53,7 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va

def showQuantiles(out: PrintStream = System.out): Unit = {
out.println("min\t25%\t50%\t75%\tmax")
getQuantiles(defaultProbabilities).foreach{q => out.print(q + "\t")}
getQuantiles(defaultProbabilities).foreach(q => out.print(q + "\t"))
out.println
}

Expand Down Expand Up @@ -81,7 +81,7 @@ private[spark] object Distribution {

def showQuantiles(out: PrintStream = System.out, quantiles: Traversable[Double]) {
out.println("min\t25%\t50%\t75%\tmax")
quantiles.foreach{q => out.print(q + "\t")}
quantiles.foreach(q => out.print(q + "\t"))
out.println
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ object DriverSubmissionTest {
val properties = System.getProperties()

println("Environment variables containing SPARK_TEST:")
env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println)
env.filter {case (k, v) => k.contains("SPARK_TEST")}.foreach(println)

println("System properties containing spark.test:")
properties.filter{case (k, v) => k.toString.contains("spark.test")}.foreach(println)
properties.filter {case (k, v) => k.toString.contains("spark.test")}.foreach(println)

for (i <- 1 until numSecondsToSleep) {
println(s"Alive for $i out of $numSecondsToSleep seconds")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object LocalFileLR {
val ITERATIONS = args(1).toInt

// Initialize w to a random value
var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
var w = DenseVector.fill(D)(2 * rand.nextDouble - 1)
println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object LocalKMeans {

def generateData = {
def generatePoint(i: Int) = {
DenseVector.fill(D){rand.nextDouble * R}
DenseVector.fill(D)(rand.nextDouble * R)
}
Array.tabulate(N)(generatePoint)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object LocalLR {
def generateData = {
def generatePoint(i: Int) = {
val y = if(i % 2 == 0) -1 else 1
val x = DenseVector.fill(D){rand.nextGaussian + y * R}
val x = DenseVector.fill(D)(rand.nextGaussian + y * R)
DataPoint(x, y)
}
Array.tabulate(N)(generatePoint)
Expand All @@ -59,7 +59,7 @@ object LocalLR {

val data = generateData
// Initialize w to a random value
var w = DenseVector.fill(D){2 * rand.nextDouble - 1}
var w = DenseVector.fill(D)(2 * rand.nextDouble - 1)
println("Initial w: " + w)

for (i <- 1 to ITERATIONS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object LogQuery {

dataSet.map(line => (extractKey(line), extractStats(line)))
.reduceByKey((a, b) => a.merge(b))
.collect().foreach{
.collect().foreach {
case (user, query) => println("%s\t%s".format(user, query))}

sc.stop()
Expand Down
Loading