Skip to content

Commit 9ecc08e

Browse files
committed
Fix comments and code style
1 parent 28d1bee commit 9ecc08e

File tree

4 files changed

+12
-15
lines changed

4 files changed

+12
-15
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1859,16 +1859,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
18591859
new SimpleFutureAction(waiter, resultFunc)
18601860
}
18611861

1862-
/**
1863-
* Submit a job for execution and return a FutureJob holding the result. Return a Future for
1864-
* monitoring the job success or failure event.
1865-
*/
18661862
private[spark] def submitAsyncJob[T, U, R](
18671863
rdd: RDD[T],
18681864
processPartition: (TaskContext, Iterator[T]) => U,
18691865
resultHandler: (Int, U) => Unit,
1870-
resultFunc: => R): Future[Unit] =
1871-
{
1866+
resultFunc: => R): Future[Unit] = {
18721867
assertNotStopped()
18731868
val cleanF = clean(processPartition)
18741869
val callSite = getCallSite

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ private[streaming] abstract class ReceiverSupervisor(
174174
}(futureExecutionContext)
175175
}
176176

177-
/** Reschedule this receiver and return a candidate executor list */
177+
/** Return a list of candidate executors to run the receiver */
178178
def getAllowedLocations(): Seq[String] = Seq.empty
179179

180180
/** Check if receiver has been marked for stopping */

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverScheduler.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ private[streaming] case class ReceiverTrackingInfo(
3131
private[streaming] trait ReceiverScheduler {
3232

3333
/**
34-
* Return a candidate executor list to run the receiver. If the list is empty, the caller can run
35-
* this receiver in arbitrary executor.
34+
* Return a list of candidate executors to run the receiver. If the list is empty, the caller can
35+
* run this receiver in arbitrary executor.
3636
*/
3737
def scheduleReceiver(
3838
receiverId: Int,
@@ -69,7 +69,7 @@ private[streaming] class LoadBalanceReceiverSchedulerImpl extends ReceiverSchedu
6969
val scheduledLocations = receiverTrackingInfo.scheduledLocations.get
7070
// The probability that a scheduled receiver will run in an executor is
7171
// 1.0 / scheduledLocations.size
72-
scheduledLocations.map(location => (location, 1.0 / scheduledLocations.size))
72+
scheduledLocations.map(location => location -> 1.0 / scheduledLocations.size)
7373
case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningLocation.get -> 1.0)
7474
}
7575
}.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,17 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
9393

9494
@volatile private var stopping = false
9595

96-
// Track receivers' status for scheduling
96+
/**
97+
* Track receivers' status for scheduling
98+
*/
9799
private val receiverTrackingInfos = new HashMap[Int, ReceiverTrackingInfo]
98100

99-
// Store all preferred locations for all receivers. We need this information to schedule receivers
101+
/**
102+
* Store all preferred locations for all receivers. We need this information to schedule receivers
103+
*/
100104
private val receiverPreferredLocations = new HashMap[Int, Option[String]]
101105

102-
// Use a separate lock to avoid dead-lock
106+
/** Use a separate lock to avoid dead-lock */
103107
private val receiverTrackingInfosLock = new AnyRef
104108

105109
/** Start the endpoint and receiver execution thread. */
@@ -405,8 +409,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
405409
}
406410
val future = ssc.sparkContext.submitAsyncJob[Receiver[_], Unit, Unit](
407411
receiverRDD, startReceiver, (_, _) => Unit, ())
408-
// TODO Refactor JobWaiter to avoid creating a new Thread here. Otherwise, it's not
409-
// scalable because we need a thread for each Receiver.
410412
future.onComplete {
411413
case Success(_) =>
412414
if (stopping) {

0 commit comments

Comments
 (0)