Skip to content

Commit e442246

Browse files
committed
Merge github.com:apache/spark into cleanup
2 parents 88904a3 + 75d46be commit e442246

File tree

58 files changed

+322
-92
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+322
-92
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class SparkEnv private[spark] (
8181
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
8282
// down, but let's call it anyway in case it gets fixed in a later release
8383
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
84-
//actorSystem.awaitTermination()
84+
// actorSystem.awaitTermination()
8585
}
8686

8787
private[spark]

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
6666
// TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
6767
// This is unfortunate, but for now we just comment it out.
6868
workerActorSystems.foreach(_.shutdown())
69-
//workerActorSystems.foreach(_.awaitTermination())
69+
// workerActorSystems.foreach(_.awaitTermination())
7070
masterActorSystems.foreach(_.shutdown())
71-
//masterActorSystems.foreach(_.awaitTermination())
71+
// masterActorSystems.foreach(_.awaitTermination())
7272
masterActorSystems.clear()
7373
workerActorSystems.clear()
7474
}

core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
3030
* [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
3131
*/
3232
private[spark] trait LeaderElectionAgent extends Actor {
33-
//TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
33+
// TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
3434
val masterActor: ActorRef
3535
}
3636

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,10 @@ private[spark] class Executor(
112112
}
113113
}
114114

115-
// Create our ClassLoader and set it on this thread
115+
// Create our ClassLoader
116116
// do this after SparkEnv creation so can access the SecurityManager
117117
private val urlClassLoader = createClassLoader()
118118
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
119-
Thread.currentThread.setContextClassLoader(replClassLoader)
120119

121120
// Akka's message frame size. If task result is bigger than this, we use the block manager
122121
// to send the result back.
@@ -276,7 +275,6 @@ private[spark] class Executor(
276275
// have left some weird state around depending on when the exception was thrown, but on
277276
// the other hand, maybe we could detect that when future tasks fail and exit then.
278277
logError("Exception in task ID " + taskId, t)
279-
//System.exit(1)
280278
}
281279
} finally {
282280
// TODO: Unregister shuffle memory only for ResultTask
@@ -294,7 +292,7 @@ private[spark] class Executor(
294292
* created by the interpreter to the search path
295293
*/
296294
private def createClassLoader(): ExecutorURLClassLoader = {
297-
val loader = this.getClass.getClassLoader
295+
val loader = Thread.currentThread().getContextClassLoader
298296

299297
// For each of the jars in the jarSet, add them to the class loader.
300298
// We assume each of the files has already been fetched.

core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
4242
}
4343

4444
def initialize() {
45-
//Add default properties in case there's no properties file
45+
// Add default properties in case there's no properties file
4646
setDefaultProperties(properties)
4747

4848
// If spark.metrics.conf is not set, try to get file in class path

core/src/main/scala/org/apache/spark/network/Connection.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
4848
channel.socket.setTcpNoDelay(true)
4949
channel.socket.setReuseAddress(true)
5050
channel.socket.setKeepAlive(true)
51-
/*channel.socket.setReceiveBufferSize(32768) */
51+
/* channel.socket.setReceiveBufferSize(32768) */
5252

5353
@volatile private var closed = false
5454
var onCloseCallback: Connection => Unit = null
@@ -206,12 +206,12 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
206206

207207
private class Outbox {
208208
val messages = new Queue[Message]()
209-
val defaultChunkSize = 65536 //32768 //16384
209+
val defaultChunkSize = 65536
210210
var nextMessageToBeUsed = 0
211211

212212
def addMessage(message: Message) {
213213
messages.synchronized{
214-
/*messages += message*/
214+
/* messages += message*/
215215
messages.enqueue(message)
216216
logDebug("Added [" + message + "] to outbox for sending to " +
217217
"[" + getRemoteConnectionManagerId() + "]")
@@ -221,8 +221,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
221221
def getChunk(): Option[MessageChunk] = {
222222
messages.synchronized {
223223
while (!messages.isEmpty) {
224-
/*nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
225-
/*val message = messages(nextMessageToBeUsed)*/
224+
/* nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
225+
/* val message = messages(nextMessageToBeUsed)*/
226226
val message = messages.dequeue
227227
val chunk = message.getChunkForSending(defaultChunkSize)
228228
if (chunk.isDefined) {
@@ -262,7 +262,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
262262

263263
val currentBuffers = new ArrayBuffer[ByteBuffer]()
264264

265-
/*channel.socket.setSendBufferSize(256 * 1024)*/
265+
/* channel.socket.setSendBufferSize(256 * 1024)*/
266266

267267
override def getRemoteAddress() = address
268268

@@ -355,7 +355,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
355355
}
356356
case None => {
357357
// changeConnectionKeyInterest(0)
358-
/*key.interestOps(0)*/
358+
/* key.interestOps(0)*/
359359
return false
360360
}
361361
}
@@ -540,10 +540,10 @@ private[spark] class ReceivingConnection(
540540
return false
541541
}
542542

543-
/*logDebug("Read " + bytesRead + " bytes for the buffer")*/
543+
/* logDebug("Read " + bytesRead + " bytes for the buffer")*/
544544

545545
if (currentChunk.buffer.remaining == 0) {
546-
/*println("Filled buffer at " + System.currentTimeMillis)*/
546+
/* println("Filled buffer at " + System.currentTimeMillis)*/
547547
val bufferMessage = inbox.getMessageForChunk(currentChunk).get
548548
if (bufferMessage.isCompletelyReceived) {
549549
bufferMessage.flip

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
504504
}
505505
}
506506
handleMessageExecutor.execute(runnable)
507-
/*handleMessage(connection, message)*/
507+
/* handleMessage(connection, message)*/
508508
}
509509

510510
private def handleClientAuthentication(
@@ -732,7 +732,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
732732
logTrace("Sending Security [" + message + "] to [" + connManagerId + "]")
733733
val connection = connectionsById.getOrElseUpdate(connManagerId, startNewConnection())
734734

735-
//send security message until going connection has been authenticated
735+
// send security message until going connection has been authenticated
736736
connection.send(message)
737737

738738
wakeupSelector()
@@ -858,14 +858,14 @@ private[spark] object ConnectionManager {
858858
None
859859
})
860860

861-
/*testSequentialSending(manager)*/
862-
/*System.gc()*/
861+
/* testSequentialSending(manager)*/
862+
/* System.gc()*/
863863

864-
/*testParallelSending(manager)*/
865-
/*System.gc()*/
864+
/* testParallelSending(manager)*/
865+
/* System.gc()*/
866866

867-
/*testParallelDecreasingSending(manager)*/
868-
/*System.gc()*/
867+
/* testParallelDecreasingSending(manager)*/
868+
/* System.gc()*/
869869

870870
testContinuousSending(manager)
871871
System.gc()
@@ -947,7 +947,7 @@ private[spark] object ConnectionManager {
947947
val ms = finishTime - startTime
948948
val tput = mb * 1000.0 / ms
949949
println("--------------------------")
950-
/*println("Started at " + startTime + ", finished at " + finishTime) */
950+
/* println("Started at " + startTime + ", finished at " + finishTime) */
951951
println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
952952
println("--------------------------")
953953
println()

core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ private[spark] object ConnectionManagerTest extends Logging{
4747
val slaves = slavesFile.mkString.split("\n")
4848
slavesFile.close()
4949

50-
/*println("Slaves")*/
51-
/*slaves.foreach(println)*/
50+
/* println("Slaves")*/
51+
/* slaves.foreach(println)*/
5252
val tasknum = if (args.length > 2) args(2).toInt else slaves.length
5353
val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
5454
val count = if (args.length > 4) args(4).toInt else 3

core/src/main/scala/org/apache/spark/network/ReceiverTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ private[spark] object ReceiverTest {
2727
println("Started connection manager with id = " + manager.id)
2828

2929
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
30-
/*println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
30+
/* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
3131
val buffer = ByteBuffer.wrap("response".getBytes)
3232
Some(Message.createBufferMessage(buffer, msg.id))
3333
})

core/src/main/scala/org/apache/spark/network/SenderTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ private[spark] object SenderTest {
5050
(0 until count).foreach(i => {
5151
val dataMessage = Message.createBufferMessage(buffer.duplicate)
5252
val startTime = System.currentTimeMillis
53-
/*println("Started timer at " + startTime)*/
53+
/* println("Started timer at " + startTime)*/
5454
val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage)
5555
.map { response =>
5656
val buffer = response.asInstanceOf[BufferMessage].buffers(0)

0 commit comments

Comments
 (0)