Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
*/
private val receiverPreferredLocations = new HashMap[Int, Option[String]]

// start receiver after maxRegisteredWaitingTime milliseconds
private val maxRegisteredWaitingTimeMs =
ssc.conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
private val createTime = System.currentTimeMillis()

/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
Expand Down Expand Up @@ -414,21 +419,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
}

/**
* Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
* receivers to be scheduled on the same node.
*
* TODO Should poll the executor number and wait for executors according to
* "spark.scheduler.minRegisteredResourcesRatio" and
* "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
*/
private def runDummySparkJob(): Unit = {
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
assert(getExecutors.nonEmpty)
}

/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
Expand All @@ -440,7 +430,10 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
rcvr
}

runDummySparkJob()
while ((System.currentTimeMillis() - createTime) < maxRegisteredWaitingTimeMs) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't spin on a condition like this; it'll waste CPU in millions of system calls. This also forces a delay of this waiting time, which is not OK.

Copy link
Author

@Astralidea Astralidea Oct 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. But I think it only waste a little time, and it better because it could be config
and how to write code gracefully?
I hope to make it more better but do not know how to do it.


logInfo("Receiver is ready for scheduling beginning after waiting " +
s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeMs(ms)")

logInfo("Starting " + receivers.length + " receivers")
endpoint.send(StartAllReceivers(receivers))
Expand Down