From bacf53520a13c90a547b2bdab7b97d1795d3a739 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Tue, 5 Apr 2016 18:53:14 -0700 Subject: [PATCH 1/7] - Moved the failure check to a method - Added test case --- .../hive/execution/ScriptTransformation.scala | 44 ++++++++++++++----- .../execution/ScriptTransformationSuite.scala | 16 +++++++ 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index f6e6a75c3ee58..d2288105473f1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution import java.io._ import java.nio.charset.StandardCharsets import java.util.Properties +import java.util.concurrent.atomic.AtomicReference import javax.annotation.Nullable import scala.collection.JavaConverters._ @@ -32,7 +33,7 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.io.Writable -import org.apache.spark.TaskContext +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} @@ -127,14 +128,36 @@ case class ScriptTransformation( } val mutableRow = new SpecificMutableRow(output.map(_.dataType)) + private def checkFailureAndPropagate(): Unit = { + if (writerThread.exception.isDefined) { + throw writerThread.exception.get + } + + // Checks if the proc is still alive (incase the command ran was bad) + // The ideal way to do this is to use Java 8's Process#isAlive() + // Unfortunately, Jenkins builds for Spark run with Java 7 so that cannot be used + // Following is a workaround used to check if a process is alive in Java 7 + // TODO: Once builds are switched to Java 8, this can be changed + try { + val exitCode = proc.exitValue() + if (exitCode != 0) { + logError(stderrBuffer.toString) // log the stderr circular buffer + throw new SparkException(s"Subprocess exited with status $exitCode. " + + s"Error: ${stderrBuffer.toString}") + } + } catch { + case _: IllegalThreadStateException => + // This means that the process is still active so the command was being launched + // Ignore the exception and move ahead + } + } + override def hasNext: Boolean = { if (outputSerde == null) { if (curLine == null) { curLine = reader.readLine() if (curLine == null) { - if (writerThread.exception.isDefined) { - throw writerThread.exception.get - } + checkFailureAndPropagate() false } else { true @@ -147,7 +170,7 @@ case class ScriptTransformation( if (scriptOutputReader != null) { if (scriptOutputReader.next(scriptOutputWritable) <= 0) { - writerThread.exception.foreach(throw _) + checkFailureAndPropagate() false } else { true @@ -158,9 +181,7 @@ case class ScriptTransformation( true } catch { case _: EOFException => - if (writerThread.exception.isDefined) { - throw writerThread.exception.get - } + checkFailureAndPropagate() false } } @@ -240,10 +261,11 @@ private class ScriptTransformationWriterThread( setDaemon(true) - @volatile private var _exception: Throwable = null + private val _exception = new AtomicReference[Throwable](null) /** Contains the exception thrown while writing the parent iterator to the external process. */ - def exception: Option[Throwable] = Option(_exception) + def exception: Option[Throwable] = + if (_exception.get() == null) None else Option(_exception.get()) override def run(): Unit = Utils.logUncaughtExceptions { TaskContext.setTaskContext(taskContext) @@ -290,7 +312,7 @@ private class ScriptTransformationWriterThread( case NonFatal(e) => // An error occurred while writing input, so kill the child process. According to the // Javadoc this call will not throw an exception: - _exception = e + _exception.set(e) proc.destroy() throw e } finally { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 19e8025d6b7c9..35098c3045447 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -109,6 +109,22 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { } assert(e.getMessage().contains("intentional exception")) } + + test("some_non_existent_command") { + val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + intercept[TestFailedException] { + checkAnswer( + rowsDf, + (child: SparkPlan) => new ScriptTransformation( + input = Seq(rowsDf.col("a").expr), + script = "some_non_existent_command", + output = Seq(AttributeReference("a", StringType)()), + child = child, + ioschema = serdeIOSchema + )(hiveContext), + rowsDf.collect()) + } + } } private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode { From c672a4c4ec7f6e40fb15ad0bd96fef79a985089d Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Tue, 5 Apr 2016 18:56:10 -0700 Subject: [PATCH 2/7] Removed unwanted changes --- .../spark/sql/hive/execution/ScriptTransformation.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index d2288105473f1..68e520d37dffa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.execution import java.io._ import java.nio.charset.StandardCharsets import java.util.Properties -import java.util.concurrent.atomic.AtomicReference import javax.annotation.Nullable import scala.collection.JavaConverters._ @@ -261,11 +260,10 @@ private class ScriptTransformationWriterThread( setDaemon(true) - private val _exception = new AtomicReference[Throwable](null) + @volatile private var _exception: Throwable = null /** Contains the exception thrown while writing the parent iterator to the external process. */ - def exception: Option[Throwable] = - if (_exception.get() == null) None else Option(_exception.get()) + def exception: Option[Throwable] = Option(_exception) override def run(): Unit = Utils.logUncaughtExceptions { TaskContext.setTaskContext(taskContext) @@ -312,7 +310,7 @@ private class ScriptTransformationWriterThread( case NonFatal(e) => // An error occurred while writing input, so kill the child process. According to the // Javadoc this call will not throw an exception: - _exception.set(e) + _exception = e proc.destroy() throw e } finally { From aa45d1d6490acd28d0e9c9ea21c8253e8b8d9123 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Fri, 8 Apr 2016 09:31:42 -0700 Subject: [PATCH 3/7] Made the test case more robust --- .../spark/sql/execution/SparkPlanTest.scala | 7 ++++++- .../execution/ScriptTransformationSuite.scala | 19 ++++++++++--------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala index 9fe0e9646e31e..b29e822add8bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala @@ -231,7 +231,12 @@ object SparkPlanTest { } } - private def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] = { + /** + * Runs the plan + * @param outputPlan SparkPlan to be executed + * @param spark SqlContext used for execution of the plan + */ + def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] = { val execution = new QueryExecution(spark.sparkSession, null) { override lazy val sparkPlan: SparkPlan = outputPlan transform { case plan: SparkPlan => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 35098c3045447..39c09294dfb2f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.scalatest.exceptions.TestFailedException -import org.apache.spark.TaskContext +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} @@ -110,20 +110,21 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { assert(e.getMessage().contains("intentional exception")) } - test("some_non_existent_command") { + test("script transformation should fail when user specifies a bad script command") { val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") - intercept[TestFailedException] { - checkAnswer( - rowsDf, - (child: SparkPlan) => new ScriptTransformation( + + val e = intercept[SparkException] { + val plan = + new ScriptTransformation( input = Seq(rowsDf.col("a").expr), script = "some_non_existent_command", output = Seq(AttributeReference("a", StringType)()), - child = child, + child = rowsDf.queryExecution.sparkPlan, ioschema = serdeIOSchema - )(hiveContext), - rowsDf.collect()) + )(hiveContext) + SparkPlanTest.executePlan(plan, hiveContext) } + assert(e.getMessage.contains("Subprocess exited with status")) } } From 26370391c3c1714e886cf8bd43ef78ac8e0ea899 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Tue, 10 May 2016 13:52:50 -0700 Subject: [PATCH 4/7] rebased --- .../spark/sql/hive/execution/ScriptTransformation.scala | 2 +- .../spark/sql/hive/execution/ScriptTransformationSuite.scala | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 68e520d37dffa..9889939c78f65 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -134,7 +134,7 @@ case class ScriptTransformation( // Checks if the proc is still alive (incase the command ran was bad) // The ideal way to do this is to use Java 8's Process#isAlive() - // Unfortunately, Jenkins builds for Spark run with Java 7 so that cannot be used + // but it cannot be used because Spark still supports Java 7. // Following is a workaround used to check if a process is alive in Java 7 // TODO: Once builds are switched to Java 8, this can be changed try { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 39c09294dfb2f..8f962904a1ce5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -110,7 +110,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { assert(e.getMessage().contains("intentional exception")) } - test("script transformation should fail when user specifies a bad script command") { + test("SPARK-14400 script transformation should fail for bad script command") { val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") val e = intercept[SparkException] { @@ -120,8 +120,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { script = "some_non_existent_command", output = Seq(AttributeReference("a", StringType)()), child = rowsDf.queryExecution.sparkPlan, - ioschema = serdeIOSchema - )(hiveContext) + ioschema = serdeIOSchema) SparkPlanTest.executePlan(plan, hiveContext) } assert(e.getMessage.contains("Subprocess exited with status")) From 9a496f01416bebc35749de3c2a62534ccb6f839b Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Tue, 17 May 2016 13:56:43 -0700 Subject: [PATCH 5/7] check for `Exception` instead of `SparkException` since the writer thread might be the first to see the effect of process being killed ``` Exception in thread "Thread-ScriptTransformation-Feed" java.io.IOException: Broken pipe at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply$mcV$sp(ScriptTransformation.scala:307) at org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply(ScriptTransformation.scala:268) at org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply(ScriptTransformation.scala:268) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1793) at org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.run(ScriptTransformation.scala:268) ``` --- .../spark/sql/hive/execution/ScriptTransformationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 8f962904a1ce5..b12b3731920d1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -113,7 +113,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { test("SPARK-14400 script transformation should fail for bad script command") { val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") - val e = intercept[SparkException] { + val e = intercept[Exception] { val plan = new ScriptTransformation( input = Seq(rowsDf.col("a").expr), From 660deb30a94128a6ef9ea78b7f14d423a116b1b4 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Tue, 17 May 2016 23:58:17 -0700 Subject: [PATCH 6/7] Handle exceptions in the ScriptTransformationWriterThread --- .../hive/execution/ScriptTransformation.scala | 73 +++++++++++-------- .../execution/ScriptTransformationSuite.scala | 2 +- 2 files changed, 44 insertions(+), 31 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 9889939c78f65..ee1bea096de9f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -127,7 +127,7 @@ case class ScriptTransformation( } val mutableRow = new SpecificMutableRow(output.map(_.dataType)) - private def checkFailureAndPropagate(): Unit = { + private def checkFailureAndPropagate(cause: Throwable = null): Unit = { if (writerThread.exception.isDefined) { throw writerThread.exception.get } @@ -142,50 +142,63 @@ case class ScriptTransformation( if (exitCode != 0) { logError(stderrBuffer.toString) // log the stderr circular buffer throw new SparkException(s"Subprocess exited with status $exitCode. " + - s"Error: ${stderrBuffer.toString}") + s"Error: ${stderrBuffer.toString}", cause) } } catch { case _: IllegalThreadStateException => - // This means that the process is still active so the command was being launched - // Ignore the exception and move ahead + // This means that the process is still alive. Move ahead } } override def hasNext: Boolean = { - if (outputSerde == null) { - if (curLine == null) { - curLine = reader.readLine() + try { + if (outputSerde == null) { if (curLine == null) { - checkFailureAndPropagate() - false + curLine = reader.readLine() + if (curLine == null) { + checkFailureAndPropagate() + false + } else { + true + } } else { true } - } else { - true - } - } else if (scriptOutputWritable == null) { - scriptOutputWritable = reusedWritableObject + } else if (scriptOutputWritable == null) { + scriptOutputWritable = reusedWritableObject - if (scriptOutputReader != null) { - if (scriptOutputReader.next(scriptOutputWritable) <= 0) { - checkFailureAndPropagate() - false - } else { - true - } - } else { - try { - scriptOutputWritable.readFields(scriptOutputStream) - true - } catch { - case _: EOFException => + if (scriptOutputReader != null) { + if (scriptOutputReader.next(scriptOutputWritable) <= 0) { checkFailureAndPropagate() false + } else { + true + } + } else { + try { + scriptOutputWritable.readFields(scriptOutputStream) + true + } catch { + case _: EOFException => + // This means that the stdout of `proc` (ie. TRANSFORM process) has exhausted. + // Ideally the proc should *not* be alive at this point but + // there can be a lag between EOF being written out and the process + // being terminated. So explicitly waiting for the process to be done. + proc.waitFor() + checkFailureAndPropagate() + false + } } + } else { + true } - } else { - true + } catch { + case NonFatal(e) => + // If this exception is due to abrupt / unclean termination of `proc`, + // then detect it and propagate a better exception message for end users + checkFailureAndPropagate(e) + + throw e } } @@ -304,7 +317,6 @@ private class ScriptTransformationWriterThread( } } } - outputStream.close() threwException = false } catch { case NonFatal(e) => @@ -315,6 +327,7 @@ private class ScriptTransformationWriterThread( throw e } finally { try { + outputStream.close() if (proc.waitFor() != 0) { logError(stderrBuffer.toString) // log the stderr circular buffer } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index b12b3731920d1..8f962904a1ce5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -113,7 +113,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { test("SPARK-14400 script transformation should fail for bad script command") { val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") - val e = intercept[Exception] { + val e = intercept[SparkException] { val plan = new ScriptTransformation( input = Seq(rowsDf.col("a").expr), From 4a7b2e8827ade4b999206454092101139a92895d Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Thu, 26 May 2016 23:08:31 -0700 Subject: [PATCH 7/7] simplified the `if else` nestedness --- .../hive/execution/ScriptTransformation.scala | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index ee1bea096de9f..9e25e1d40ce81 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -157,12 +157,8 @@ case class ScriptTransformation( curLine = reader.readLine() if (curLine == null) { checkFailureAndPropagate() - false - } else { - true + return false } - } else { - true } } else if (scriptOutputWritable == null) { scriptOutputWritable = reusedWritableObject @@ -170,14 +166,11 @@ case class ScriptTransformation( if (scriptOutputReader != null) { if (scriptOutputReader.next(scriptOutputWritable) <= 0) { checkFailureAndPropagate() - false - } else { - true + return false } } else { try { scriptOutputWritable.readFields(scriptOutputStream) - true } catch { case _: EOFException => // This means that the stdout of `proc` (ie. TRANSFORM process) has exhausted. @@ -186,12 +179,12 @@ case class ScriptTransformation( // being terminated. So explicitly waiting for the process to be done. proc.waitFor() checkFailureAndPropagate() - false + return false } } - } else { - true } + + true } catch { case NonFatal(e) => // If this exception is due to abrupt / unclean termination of `proc`,