@@ -40,8 +40,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
4040 var daemon : Process = null
4141 val daemonHost = InetAddress .getByAddress(Array (127 , 0 , 0 , 1 ))
4242 var daemonPort : Int = 0
43- var daemonWorkers = new mutable.WeakHashMap [Socket , Int ]()
44- var idleWorkers = new mutable.Queue [Socket ]()
43+ val daemonWorkers = new mutable.WeakHashMap [Socket , Int ]()
44+ val idleWorkers = new mutable.Queue [Socket ]()
45+ var lastActivity = 0L
46+ new MonitorThread ().start()
4547
4648 var simpleWorkers = new mutable.WeakHashMap [Socket , Process ]()
4749
@@ -52,8 +54,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
5254
5355 def create (): Socket = {
5456 if (useDaemon) {
55- if (idleWorkers.length > 0 ) {
56- return idleWorkers.dequeue()
57+ idleWorkers.synchronized {
58+ if (idleWorkers.size > 0 ) {
59+ return idleWorkers.dequeue()
60+ }
5761 }
5862 createThroughDaemon()
5963 } else {
@@ -203,6 +207,35 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
203207 }
204208 }
205209
210+ /**
211+ * Monitor all the idle workers, kill them after timeout.
212+ */
213+ private class MonitorThread extends Thread (s " Idle Worker Monitor for $pythonExec" ) {
214+
215+ setDaemon(true )
216+
217+ override def run () {
218+ while (true ) {
219+ idleWorkers.synchronized {
220+ if (lastActivity + IDLE_WORKER_TIMEOUT_MS < System .currentTimeMillis()) {
221+ while (idleWorkers.length > 0 ) {
222+ val worker = idleWorkers.dequeue()
223+ try {
224+ // the Python worker will exit after closing the socket
225+ worker.close()
226+ } catch {
227+ case e : Exception =>
228+ logWarning(" Failed to close worker socket" , e)
229+ }
230+ }
231+ lastActivity = System .currentTimeMillis()
232+ }
233+ }
234+ Thread .sleep(10000 )
235+ }
236+ }
237+ }
238+
206239 private def stopDaemon () {
207240 synchronized {
208241 if (useDaemon) {
@@ -242,7 +275,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
242275
243276 def releaseWorker (worker : Socket ) {
244277 if (useDaemon && envVars.get(" SPARK_REUSE_WORKER" ).isDefined) {
278+ idleWorkers.synchronized {
279+ lastActivity = System .currentTimeMillis()
245280 idleWorkers.enqueue(worker)
281+ }
246282 } else {
247283 // Cleanup the worker socket. This will also cause the Python worker to exit.
248284 try {
@@ -257,4 +293,5 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
257293
258294private object PythonWorkerFactory {
259295 val PROCESS_WAIT_TIMEOUT_MS = 10000
296+ val IDLE_WORKER_TIMEOUT_MS = 60000
260297}
0 commit comments