Skip to content

Commit 20eb748

Browse files
committed
Address comments and fix docs
1 parent 5fd8684 commit 20eb748

File tree

4 files changed

+27
-19
lines changed

4 files changed

+27
-19
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ private[spark] object PythonRDD extends Logging {
138138
* or to enable local execution.
139139
*
140140
* @return 3-tuple (as a Java array) with the port number of a local socket which serves the
141-
* data collected from this job, the secret for authentication, and a server object
142-
* that can be used to sync the JVM serving thread in Python.
141+
* data collected from this job, the secret for authentication, and a socket auth
142+
* server object that can be used to join the JVM serving thread in Python.
143143
*/
144144
def runJob(
145145
sc: SparkContext,
@@ -158,8 +158,8 @@ private[spark] object PythonRDD extends Logging {
158158
* A helper function to collect an RDD as an iterator, then serve it via socket.
159159
*
160160
* @return 3-tuple (as a Java array) with the port number of a local socket which serves the
161-
* data collected from this job, the secret for authentication, and a server object
162-
* that can be used to sync the JVM serving thread in Python.
161+
* data collected from this job, the secret for authentication, and a socket auth
162+
* server object that can be used to join the JVM serving thread in Python.
163163
*/
164164
def collectAndServe[T](rdd: RDD[T]): Array[Any] = {
165165
serveIterator(rdd.collect().iterator, s"serve RDD ${rdd.id}")
@@ -170,12 +170,12 @@ private[spark] object PythonRDD extends Logging {
170170
* are collected as separate jobs, by order of index. Partition data is first requested by a
171171
* non-zero integer to start a collection job. The response is prefaced by an integer with 1
172172
* meaning partition data will be served, 0 meaning the local iterator has been consumed,
173-
* and -1 meaining an error occurred during collection. This function is used by
173+
* and -1 meaning an error occurred during collection. This function is used by
174174
* pyspark.rdd._local_iterator_from_socket().
175175
*
176176
* @return 3-tuple (as a Java array) with the port number of a local socket which serves the
177-
* data collected from this job, the secret for authentication, and a server object
178-
* that can be used to sync the JVM serving thread in Python.
177+
* data collected from this job, the secret for authentication, and a socket auth
178+
* server object that can be used to join the JVM serving thread in Python.
179179
*/
180180
def toLocalIteratorAndServe[T](rdd: RDD[T]): Array[Any] = {
181181
val handleFunc = (sock: Socket) => {
@@ -447,8 +447,8 @@ private[spark] object PythonRDD extends Logging {
447447
* The thread will terminate after all the data are sent or any exceptions happen.
448448
*
449449
* @return 3-tuple (as a Java array) with the port number of a local socket which serves the
450-
* data collected from this job, the secret for authentication, and a server object
451-
* that can be used to sync the JVM serving thread in Python.
450+
* data collected from this job, the secret for authentication, and a socket auth
451+
* server object that can be used to join the JVM serving thread in Python.
452452
*/
453453
def serveIterator(items: Iterator[_], threadName: String): Array[Any] = {
454454
serveToStream(threadName) { out =>
@@ -470,8 +470,8 @@ private[spark] object PythonRDD extends Logging {
470470
* exceptions happen.
471471
*
472472
* @return 3-tuple (as a Java array) with the port number of a local socket which serves the
473-
* data collected from this job, the secret for authentication, and a server object
474-
* that can be used to sync the JVM serving thread in Python.
473+
* data collected from this job, the secret for authentication, and a socket auth
474+
* server object that can be used to join the JVM serving thread in Python.
475475
*/
476476
private[spark] def serveToStream(
477477
threadName: String)(writeFunc: OutputStream => Unit): Array[Any] = {

core/src/main/scala/org/apache/spark/security/SocketAuthServer.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ import org.apache.spark.util.{ThreadUtils, Utils}
3232
/**
3333
* Creates a server in the JVM to communicate with external processes (e.g., Python and R) for
3434
* handling one batch of data, with authentication and error handling.
35+
*
36+
* The socket server can only accept one connection, or close if no connection
37+
* in 15 seconds.
3538
*/
3639
private[spark] abstract class SocketAuthServer[T](
3740
authHelper: SocketAuthHelper,
@@ -88,9 +91,9 @@ private[spark] abstract class SocketAuthServer[T](
8891
}
8992

9093
/**
91-
* Create a socket server class and run user function on the socket in a background thread.
92-
* This is the same as calling SocketAuthServer.setupOneConnectionServer except it creates
93-
* a server object that can then be synced from Python.
94+
* Create a socket server class and run user function on the socket in a background thread
95+
* that can read and write to the socket input/output streams. The function is passed in a
96+
* socket that has been connected and authenticated.
9497
*/
9598
private[spark] class SocketFuncServer(
9699
authHelper: SocketAuthHelper,
@@ -108,10 +111,15 @@ private[spark] object SocketAuthServer {
108111
* Convenience function to create a socket server and run a user function in a background
109112
* thread to write to an output stream.
110113
*
114+
* The socket server can only accept one connection, or close if no connection
115+
* in 15 seconds.
116+
*
111117
* @param threadName Name for the background serving thread.
112118
* @param authHelper SocketAuthHelper for authentication
113119
* @param writeFunc User function to write to a given OutputStream
114-
* @return
120+
* @return 3-tuple (as a Java array) with the port number of a local socket which serves the
121+
* data collected from this job, the secret for authentication, and a socket auth
122+
* server object that can be used to join the JVM serving thread in Python.
115123
*/
116124
def serveToStream(
117125
threadName: String,

python/pyspark/rdd.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ class PyLocalIterable(object):
174174
""" Create a synchronous local iterable over a socket """
175175

176176
def __init__(self, _sock_info, _serializer):
177-
port, auth_secret, self.jserver_obj = _sock_info
177+
port, auth_secret, self.jsocket_auth_server = _sock_info
178178
self._sockfile = _create_local_socket((port, auth_secret))
179179
self._serializer = _serializer
180180
self._read_iter = iter([]) # Initialize as empty iterator
@@ -197,7 +197,7 @@ def __iter__(self):
197197

198198
# An error occurred, join serving thread and raise any exceptions from the JVM
199199
elif self._read_status == -1:
200-
self.jserver_obj.getResult()
200+
self.jsocket_auth_server.getResult()
201201

202202
def __del__(self):
203203
# If local iterator is not fully consumed,

python/pyspark/sql/dataframe.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2200,14 +2200,14 @@ def _collectAsArrow(self):
22002200
.. note:: Experimental.
22012201
"""
22022202
with SCCallSiteSync(self._sc) as css:
2203-
port, auth_secret, jserver_obj = self._jdf.collectAsArrowToPython()
2203+
port, auth_secret, jsocket_auth_server = self._jdf.collectAsArrowToPython()
22042204

22052205
# Collect list of un-ordered batches where last element is a list of correct order indices
22062206
try:
22072207
results = list(_load_from_socket((port, auth_secret), ArrowCollectSerializer()))
22082208
finally:
22092209
# Join serving thread and raise any exceptions from collectAsArrowToPython
2210-
jserver_obj.getResult()
2210+
jsocket_auth_server.getResult()
22112211

22122212
# Separate RecordBatches from batch order indices in results
22132213
batches = results[:-1]

0 commit comments

Comments
 (0)