Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,12 @@ object SparkPlanTest {
}
}

private def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] = {
/**
* Runs the plan
* @param outputPlan SparkPlan to be executed
* @param spark SqlContext used for execution of the plan
*/
def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] = {
val execution = new QueryExecution(spark.sparkSession, null) {
override lazy val sparkPlan: SparkPlan = outputPlan transform {
case plan: SparkPlan =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.io.Writable

import org.apache.spark.TaskContext
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
Expand Down Expand Up @@ -127,45 +127,71 @@ case class ScriptTransformation(
}
val mutableRow = new SpecificMutableRow(output.map(_.dataType))

private def checkFailureAndPropagate(cause: Throwable = null): Unit = {
if (writerThread.exception.isDefined) {
throw writerThread.exception.get
}

// Checks if the proc is still alive (incase the command ran was bad)
// The ideal way to do this is to use Java 8's Process#isAlive()
// but it cannot be used because Spark still supports Java 7.
// Following is a workaround used to check if a process is alive in Java 7
// TODO: Once builds are switched to Java 8, this can be changed
try {
val exitCode = proc.exitValue()
if (exitCode != 0) {
logError(stderrBuffer.toString) // log the stderr circular buffer
throw new SparkException(s"Subprocess exited with status $exitCode. " +
s"Error: ${stderrBuffer.toString}", cause)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to this PR but we should probably only print the partial contents of the circular buffer if the number of bytes written to it are less than its total size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sameeragarwal : Thanks for pointing that out. I will submit a separate PR for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here : #13351

}
} catch {
case _: IllegalThreadStateException =>
// This means that the process is still alive. Move ahead
}
}

override def hasNext: Boolean = {
if (outputSerde == null) {
if (curLine == null) {
curLine = reader.readLine()
try {
if (outputSerde == null) {
if (curLine == null) {
if (writerThread.exception.isDefined) {
throw writerThread.exception.get
curLine = reader.readLine()
if (curLine == null) {
checkFailureAndPropagate()
return false
}
false
} else {
true
}
} else {
true
}
} else if (scriptOutputWritable == null) {
scriptOutputWritable = reusedWritableObject
} else if (scriptOutputWritable == null) {
scriptOutputWritable = reusedWritableObject

if (scriptOutputReader != null) {
if (scriptOutputReader.next(scriptOutputWritable) <= 0) {
writerThread.exception.foreach(throw _)
false
if (scriptOutputReader != null) {
if (scriptOutputReader.next(scriptOutputWritable) <= 0) {
checkFailureAndPropagate()
return false
}
} else {
true
}
} else {
try {
scriptOutputWritable.readFields(scriptOutputStream)
true
} catch {
case _: EOFException =>
if (writerThread.exception.isDefined) {
throw writerThread.exception.get
}
false
try {
scriptOutputWritable.readFields(scriptOutputStream)
} catch {
case _: EOFException =>
// This means that the stdout of `proc` (ie. TRANSFORM process) has exhausted.
// Ideally the proc should *not* be alive at this point but
// there can be a lag between EOF being written out and the process
// being terminated. So explicitly waiting for the process to be done.
proc.waitFor()
checkFailureAndPropagate()
return false
}
}
}
} else {

true
} catch {
case NonFatal(e) =>
// If this exception is due to abrupt / unclean termination of `proc`,
// then detect it and propagate a better exception message for end users
checkFailureAndPropagate(e)

throw e
}
}

Expand Down Expand Up @@ -284,7 +310,6 @@ private class ScriptTransformationWriterThread(
}
}
}
outputStream.close()
threwException = false
} catch {
case NonFatal(e) =>
Expand All @@ -295,6 +320,7 @@ private class ScriptTransformationWriterThread(
throw e
} finally {
try {
outputStream.close()
if (proc.waitFor() != 0) {
logError(stderrBuffer.toString) // log the stderr circular buffer
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.scalatest.exceptions.TestFailedException

import org.apache.spark.TaskContext
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
Expand Down Expand Up @@ -109,6 +109,22 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
}
assert(e.getMessage().contains("intentional exception"))
}

test("SPARK-14400 script transformation should fail for bad script command") {
val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")

val e = intercept[SparkException] {
val plan =
new ScriptTransformation(
input = Seq(rowsDf.col("a").expr),
script = "some_non_existent_command",
output = Seq(AttributeReference("a", StringType)()),
child = rowsDf.queryExecution.sparkPlan,
ioschema = serdeIOSchema)
SparkPlanTest.executePlan(plan, hiveContext)
}
assert(e.getMessage.contains("Subprocess exited with status"))
}
}

private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {
Expand Down