Skip to content

Commit 6bef376

Browse files
committed
Remove submitJobThreadPool since submitJob doesn't create a separate thread to wait for the job result
1 parent 513e3b0 commit 6bef376

File tree

1 file changed

+1
-6
lines changed

1 file changed

+1
-6
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -435,10 +435,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
435435
/** RpcEndpoint to receive messages from the receivers. */
436436
private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
437437

438-
// TODO Remove this thread pool after https://github.com/apache/spark/issues/7385 is merged
439-
private val submitJobThreadPool = ExecutionContext.fromExecutorService(
440-
ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool"))
441-
442438
private val walBatchingThreadPool = ExecutionContext.fromExecutorService(
443439
ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool"))
444440

@@ -610,12 +606,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
610606
logInfo(s"Restarting Receiver $receiverId")
611607
self.send(RestartReceiver(receiver))
612608
}
613-
}(submitJobThreadPool)
609+
}(ThreadUtils.sameThread)
614610
logInfo(s"Receiver ${receiver.streamId} started")
615611
}
616612

617613
override def onStop(): Unit = {
618-
submitJobThreadPool.shutdownNow()
619614
active = false
620615
walBatchingThreadPool.shutdown()
621616
}

0 commit comments

Comments
 (0)