File tree Expand file tree Collapse file tree 2 files changed +2
-15
lines changed
core/src/main/scala/org/apache/spark/api/python Expand file tree Collapse file tree 2 files changed +2
-15
lines changed Original file line number Diff line number Diff line change @@ -605,7 +605,6 @@ private[spark] object PythonRDD extends Logging {
605605 */
606606 private def serveIterator [T ](items : Iterator [T ], threadName : String ): Int = {
607607 val serverSocket = new ServerSocket (0 , 1 )
608- serverSocket.setReuseAddress(true )
609608 // Close the socket if no connection in 3 seconds
610609 serverSocket.setSoTimeout(3000 )
611610
Original file line number Diff line number Diff line change @@ -113,23 +113,11 @@ def _parse_memory(s):
113113
114114def _load_from_socket (port , serializer ):
115115 sock = socket .socket ()
116- sock .settimeout (1 )
116+ sock .settimeout (3 )
117117 try :
118118 sock .connect (("localhost" , port ))
119119 rf = sock .makefile ("rb" , 65536 )
120- iter = serializer .load_stream (rf )
121- try :
122- yield next (iter )
123- except socket .timeout as e :
124- # the connection is not acknowledged by JVM, retry
125- # server will be closed after 3 seconds, then it will be refused
126- for v in _load_from_socket (port , serializer ):
127- yield v
128- return
129-
130- # increase the timeout, because the server side may be slowed down by GC
131- sock .settimeout (10 )
132- for item in iter :
120+ for item in serializer .load_stream (rf ):
133121 yield item
134122 finally :
135123 sock .close ()
You can’t perform that action at this time.
0 commit comments