From d0df92f8291e260d0a1c06d64d5a2bce75ffb0d0 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 17 Aug 2015 16:42:00 -0700 Subject: [PATCH 1/3] [SPARK-7736] [core] Fix a race introduced in PythonRunner. The fix for SPARK-7736 introduced a race where a port value of "-1" could be passed down to the pyspark process, causing it to fail to connect back to the JVM. This change adds code to fix that race. --- .../scala/org/apache/spark/deploy/PythonRunner.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index 4277ac2ad13ea..a0c93aa32bce0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.util.Try -import org.apache.spark.SparkUserAppException +import org.apache.spark.{SparkException, SparkUserAppException} import org.apache.spark.api.python.PythonUtils import org.apache.spark.util.{RedirectThread, Utils} @@ -56,6 +56,16 @@ object PythonRunner { thread.setDaemon(true) thread.start() + // Wait until the gateway server has started, so that we know which port is it bound to. + // Fail if the thread exits before it's set. + while (thread.isAlive() && gatewayServer.getListeningPort() == -1) { + thread.join(10) + } + if (gatewayServer.getListeningPort() == -1) { + gatewayServer.shutdown() + throw new SparkException("Python GatewayServer failed to start.") + } + // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the // python directories in SPARK_HOME (if set), and any files in the pyFiles argument val pathElements = new ArrayBuffer[String] From 30b0ee5edba28c3fb401a0ada56c1c54d17cab28 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 17 Aug 2015 16:48:35 -0700 Subject: [PATCH 2/3] Re-use the thread created for running the gateway. --- core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index a0c93aa32bce0..396dc31e2e096 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -49,7 +49,7 @@ object PythonRunner { val gatewayServer = new py4j.GatewayServer(null, 0) val thread = new Thread(new Runnable() { override def run(): Unit = Utils.logUncaughtExceptions { - gatewayServer.start() + gatewayServer.start(false) } }) thread.setName("py4j-gateway") From cfef35df1eafc610701dad7407ca9bba9d8c50f1 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 17 Aug 2015 16:58:45 -0700 Subject: [PATCH 3/3] Simpler fix. --- .../org/apache/spark/deploy/PythonRunner.scala | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index 396dc31e2e096..23d01e9cbb9f9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.util.Try -import org.apache.spark.{SparkException, SparkUserAppException} +import org.apache.spark.SparkUserAppException import org.apache.spark.api.python.PythonUtils import org.apache.spark.util.{RedirectThread, Utils} @@ -49,22 +49,18 @@ object PythonRunner { val gatewayServer = new py4j.GatewayServer(null, 0) val thread = new Thread(new Runnable() { override def run(): Unit = Utils.logUncaughtExceptions { - gatewayServer.start(false) + gatewayServer.start() } }) - thread.setName("py4j-gateway") + thread.setName("py4j-gateway-init") thread.setDaemon(true) thread.start() // Wait until the gateway server has started, so that we know which port is it bound to. - // Fail if the thread exits before it's set. - while (thread.isAlive() && gatewayServer.getListeningPort() == -1) { - thread.join(10) - } - if (gatewayServer.getListeningPort() == -1) { - gatewayServer.shutdown() - throw new SparkException("Python GatewayServer failed to start.") - } + // `gatewayServer.start()` will start a new thread and run the server code there, after + // initializing the socket, so the thread started above will end as soon as the server is + // ready to serve connections. + thread.join() // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the // python directories in SPARK_HOME (if set), and any files in the pyFiles argument