@@ -122,6 +122,17 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
122122 // be called.
123123 private val receiverJobExitLatch = new CountDownLatch (receiverInputStreams.size)
124124
125+ /**
126+ * Track all receivers' information. The key is the receiver id, the value is the receiver info.
127+ */
128+ private val receiverTrackingInfos = new HashMap [Int , ReceiverTrackingInfo ]
129+
130+ /**
131+ * Store all preferred locations for all receivers. We need this information to schedule
132+ * receivers
133+ */
134+ private val receiverPreferredLocations = new HashMap [Int , Option [String ]]
135+
125136 /** Start the endpoint and receiver execution thread. */
126137 def start (): Unit = synchronized {
127138 if (isTrackerStarted) {
@@ -191,9 +202,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
191202
192203 /** Get the blocks allocated to the given batch and stream. */
193204 def getBlocksOfBatchAndStream (batchTime : Time , streamId : Int ): Seq [ReceivedBlockInfo ] = {
194- synchronized {
195- receivedBlockTracker.getBlocksOfBatchAndStream(batchTime, streamId)
196- }
205+ receivedBlockTracker.getBlocksOfBatchAndStream(batchTime, streamId)
197206 }
198207
199208 /**
@@ -211,6 +220,119 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
211220 }
212221 }
213222
223+ /** Register a receiver */
224+ private def registerReceiver (
225+ streamId : Int ,
226+ typ : String ,
227+ host : String ,
228+ receiverEndpoint : RpcEndpointRef ,
229+ senderAddress : RpcAddress
230+ ): Boolean = {
231+ if (! receiverInputStreamIds.contains(streamId)) {
232+ throw new SparkException (" Register received for unexpected id " + streamId)
233+ }
234+
235+ if (isTrackerStopping || isTrackerStopped) {
236+ false
237+ } else if (! ssc.sparkContext.isLocal && // We don't need to schedule it in the local mode
238+ ! scheduleReceiver(streamId).contains(host)) {
239+ // Refuse it since it's scheduled to a wrong executor
240+ false
241+ } else {
242+ val name = s " ${typ}- ${streamId}"
243+ val receiverInfo = ReceiverInfo (streamId, name, true , host)
244+ receiverTrackingInfos.put(streamId,
245+ ReceiverTrackingInfo (
246+ streamId,
247+ ReceiverState .ACTIVE ,
248+ scheduledLocations = None ,
249+ runningLocation = Some (host),
250+ name = Some (name),
251+ endpoint = Some (receiverEndpoint)))
252+ listenerBus.post(StreamingListenerReceiverStarted (receiverInfo))
253+ logInfo(" Registered receiver for stream " + streamId + " from " + senderAddress)
254+ true
255+ }
256+ }
257+
258+ /** Deregister a receiver */
259+ private def deregisterReceiver (streamId : Int , message : String , error : String ) {
260+ val lastErrorTime =
261+ if (error == null || error == " " ) - 1 else ssc.scheduler.clock.getTimeMillis()
262+ val errorInfo = ReceiverErrorInfo (
263+ lastErrorMessage = message, lastError = error, lastErrorTime = lastErrorTime)
264+ val newReceiverTrackingInfo = receiverTrackingInfos.get(streamId) match {
265+ case Some (oldInfo) =>
266+ oldInfo.copy(errorInfo = Some (errorInfo))
267+ case None =>
268+ logWarning(" No prior receiver info" )
269+ ReceiverTrackingInfo (
270+ streamId, ReceiverState .INACTIVE , None , None , None , None , Some (errorInfo))
271+ }
272+ receiverTrackingInfos -= streamId
273+ listenerBus.post(StreamingListenerReceiverStopped (newReceiverTrackingInfo.toReceiverInfo))
274+ val messageWithError = if (error != null && ! error.isEmpty) {
275+ s " $message - $error"
276+ } else {
277+ s " $message"
278+ }
279+ logError(s " Deregistered receiver for stream $streamId: $messageWithError" )
280+ }
281+
282+ /** Add new blocks for the given stream */
283+ private def addBlock (receivedBlockInfo : ReceivedBlockInfo ): Boolean = {
284+ receivedBlockTracker.addBlock(receivedBlockInfo)
285+ }
286+
287+ /** Report error sent by a receiver */
288+ private def reportError (streamId : Int , message : String , error : String ) {
289+ val newReceiverTrackingInfo = receiverTrackingInfos.get(streamId) match {
290+ case Some (oldInfo) =>
291+ val errorInfo = ReceiverErrorInfo (lastErrorMessage = message, lastError = error,
292+ lastErrorTime = oldInfo.errorInfo.map(_.lastErrorTime).getOrElse(- 1L ))
293+ oldInfo.copy(errorInfo = Some (errorInfo))
294+ case None =>
295+ logWarning(" No prior receiver info" )
296+ val errorInfo = ReceiverErrorInfo (lastErrorMessage = message, lastError = error,
297+ lastErrorTime = ssc.scheduler.clock.getTimeMillis())
298+ ReceiverTrackingInfo (
299+ streamId, ReceiverState .INACTIVE , None , None , None , None , Some (errorInfo))
300+ }
301+
302+ receiverTrackingInfos(streamId) = newReceiverTrackingInfo
303+ listenerBus.post(StreamingListenerReceiverError (newReceiverTrackingInfo.toReceiverInfo))
304+ val messageWithError = if (error != null && ! error.isEmpty) {
305+ s " $message - $error"
306+ } else {
307+ s " $message"
308+ }
309+ logWarning(s " Error reported by receiver for stream $streamId: $messageWithError" )
310+ }
311+
312+ private def scheduleReceiver (receiverId : Int ): Seq [String ] = {
313+ val preferredLocation = receiverPreferredLocations.getOrElse(receiverId, None )
314+ val scheduledLocations = schedulingPolicy.scheduleReceiver(
315+ receiverId, preferredLocation, receiverTrackingInfos, getExecutors(ssc))
316+ updateReceiverScheduledLocations(receiverId, scheduledLocations)
317+ scheduledLocations
318+ }
319+
320+ private def updateReceiverScheduledLocations (
321+ receiverId : Int , scheduledLocations : Seq [String ]): Unit = {
322+ val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match {
323+ case Some (oldInfo) =>
324+ oldInfo.copy(state = ReceiverState .SCHEDULED ,
325+ scheduledLocations = Some (scheduledLocations))
326+ case None =>
327+ ReceiverTrackingInfo (
328+ receiverId,
329+ ReceiverState .SCHEDULED ,
330+ Some (scheduledLocations),
331+ None )
332+ }
333+ receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo)
334+ }
335+
214336 /** Check if any blocks are left to be processed */
215337 def hasUnallocatedBlocks : Boolean = {
216338 receivedBlockTracker.hasUnallocatedReceivedBlocks
@@ -237,17 +359,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
237359 /** RpcEndpoint to receive messages from the receivers. */
238360 private class ReceiverTrackerEndpoint (override val rpcEnv : RpcEnv ) extends ThreadSafeRpcEndpoint {
239361
240- /**
241- * Track all receivers' information. The key is the receiver id, the value is the receiver info.
242- */
243- private val receiverTrackingInfos = new HashMap [Int , ReceiverTrackingInfo ]
244-
245- /**
246- * Store all preferred locations for all receivers. We need this information to schedule
247- * receivers
248- */
249- private val receiverPreferredLocations = new HashMap [Int , Option [String ]]
250-
251362 // TODO Remove this thread pool after https://github.com/apache/spark/issues/7385 is merged
252363 private val submitJobThreadPool = ExecutionContext .fromExecutorService(
253364 ThreadUtils .newDaemonCachedThreadPool(" submit-job-thead-pool" ))
@@ -352,119 +463,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
352463 }
353464 }
354465
355- /** Register a receiver */
356- private def registerReceiver (
357- streamId : Int ,
358- typ : String ,
359- host : String ,
360- receiverEndpoint : RpcEndpointRef ,
361- senderAddress : RpcAddress
362- ): Boolean = {
363- if (! receiverInputStreamIds.contains(streamId)) {
364- throw new SparkException (" Register received for unexpected id " + streamId)
365- }
366-
367- if (isTrackerStopping || isTrackerStopped) {
368- false
369- } else if (! ssc.sparkContext.isLocal && // We don't need to schedule it in the local mode
370- ! scheduleReceiver(streamId).contains(host)) {
371- // Refuse it since it's scheduled to a wrong executor
372- false
373- } else {
374- val name = s " ${typ}- ${streamId}"
375- val receiverInfo = ReceiverInfo (streamId, name, true , host)
376- receiverTrackingInfos.put(streamId,
377- ReceiverTrackingInfo (
378- streamId,
379- ReceiverState .ACTIVE ,
380- scheduledLocations = None ,
381- runningLocation = Some (host),
382- name = Some (name),
383- endpoint = Some (receiverEndpoint)))
384- listenerBus.post(StreamingListenerReceiverStarted (receiverInfo))
385- logInfo(" Registered receiver for stream " + streamId + " from " + senderAddress)
386- true
387- }
388- }
389-
390- /** Deregister a receiver */
391- private def deregisterReceiver (streamId : Int , message : String , error : String ) {
392- val lastErrorTime =
393- if (error == null || error == " " ) - 1 else ssc.scheduler.clock.getTimeMillis()
394- val errorInfo = ReceiverErrorInfo (
395- lastErrorMessage = message, lastError = error, lastErrorTime = lastErrorTime)
396- val newReceiverTrackingInfo = receiverTrackingInfos.get(streamId) match {
397- case Some (oldInfo) =>
398- oldInfo.copy(errorInfo = Some (errorInfo))
399- case None =>
400- logWarning(" No prior receiver info" )
401- ReceiverTrackingInfo (
402- streamId, ReceiverState .INACTIVE , None , None , None , None , Some (errorInfo))
403- }
404- receiverTrackingInfos -= streamId
405- listenerBus.post(StreamingListenerReceiverStopped (newReceiverTrackingInfo.toReceiverInfo))
406- val messageWithError = if (error != null && ! error.isEmpty) {
407- s " $message - $error"
408- } else {
409- s " $message"
410- }
411- logError(s " Deregistered receiver for stream $streamId: $messageWithError" )
412- }
413-
414- /** Add new blocks for the given stream */
415- private def addBlock (receivedBlockInfo : ReceivedBlockInfo ): Boolean = {
416- receivedBlockTracker.addBlock(receivedBlockInfo)
417- }
418-
419- /** Report error sent by a receiver */
420- private def reportError (streamId : Int , message : String , error : String ) {
421- val newReceiverTrackingInfo = receiverTrackingInfos.get(streamId) match {
422- case Some (oldInfo) =>
423- val errorInfo = ReceiverErrorInfo (lastErrorMessage = message, lastError = error,
424- lastErrorTime = oldInfo.errorInfo.map(_.lastErrorTime).getOrElse(- 1L ))
425- oldInfo.copy(errorInfo = Some (errorInfo))
426- case None =>
427- logWarning(" No prior receiver info" )
428- val errorInfo = ReceiverErrorInfo (lastErrorMessage = message, lastError = error,
429- lastErrorTime = ssc.scheduler.clock.getTimeMillis())
430- ReceiverTrackingInfo (
431- streamId, ReceiverState .INACTIVE , None , None , None , None , Some (errorInfo))
432- }
433-
434- receiverTrackingInfos(streamId) = newReceiverTrackingInfo
435- listenerBus.post(StreamingListenerReceiverError (newReceiverTrackingInfo.toReceiverInfo))
436- val messageWithError = if (error != null && ! error.isEmpty) {
437- s " $message - $error"
438- } else {
439- s " $message"
440- }
441- logWarning(s " Error reported by receiver for stream $streamId: $messageWithError" )
442- }
443-
444- private def updateReceiverScheduledLocations (
445- receiverId : Int , scheduledLocations : Seq [String ]): Unit = {
446- val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match {
447- case Some (oldInfo) =>
448- oldInfo.copy(state = ReceiverState .SCHEDULED ,
449- scheduledLocations = Some (scheduledLocations))
450- case None =>
451- ReceiverTrackingInfo (
452- receiverId,
453- ReceiverState .SCHEDULED ,
454- Some (scheduledLocations),
455- None )
456- }
457- receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo)
458- }
459-
460- private def scheduleReceiver (receiverId : Int ): Seq [String ] = {
461- val preferredLocation = receiverPreferredLocations.getOrElse(receiverId, None )
462- val scheduledLocations = schedulingPolicy.scheduleReceiver(
463- receiverId, preferredLocation, receiverTrackingInfos, getExecutors(ssc))
464- updateReceiverScheduledLocations(receiverId, scheduledLocations)
465- scheduledLocations
466- }
467-
468466 /** Send stop signal to the receivers. */
469467 private def stopReceivers () {
470468 // Signal the receivers to stop
0 commit comments