@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe
3232import org .apache .hadoop .hive .serde2 .objectinspector ._
3333import org .apache .hadoop .io .Writable
3434
35- import org .apache .spark .TaskContext
35+ import org .apache .spark .{ SparkException , TaskContext }
3636import org .apache .spark .internal .Logging
3737import org .apache .spark .rdd .RDD
3838import org .apache .spark .sql .catalyst .{CatalystTypeConverters , InternalRow }
@@ -127,45 +127,71 @@ case class ScriptTransformation(
127127 }
128128 val mutableRow = new SpecificMutableRow (output.map(_.dataType))
129129
130+ private def checkFailureAndPropagate (cause : Throwable = null ): Unit = {
131+ if (writerThread.exception.isDefined) {
132+ throw writerThread.exception.get
133+ }
134+
135+ // Checks if the proc is still alive (incase the command ran was bad)
136+ // The ideal way to do this is to use Java 8's Process#isAlive()
137+ // but it cannot be used because Spark still supports Java 7.
138+ // Following is a workaround used to check if a process is alive in Java 7
139+ // TODO: Once builds are switched to Java 8, this can be changed
140+ try {
141+ val exitCode = proc.exitValue()
142+ if (exitCode != 0 ) {
143+ logError(stderrBuffer.toString) // log the stderr circular buffer
144+ throw new SparkException (s " Subprocess exited with status $exitCode. " +
145+ s " Error: ${stderrBuffer.toString}" , cause)
146+ }
147+ } catch {
148+ case _ : IllegalThreadStateException =>
149+ // This means that the process is still alive. Move ahead
150+ }
151+ }
152+
130153 override def hasNext : Boolean = {
131- if (outputSerde == null ) {
132- if (curLine == null ) {
133- curLine = reader.readLine()
154+ try {
155+ if (outputSerde == null ) {
134156 if (curLine == null ) {
135- if (writerThread.exception.isDefined) {
136- throw writerThread.exception.get
157+ curLine = reader.readLine()
158+ if (curLine == null ) {
159+ checkFailureAndPropagate()
160+ return false
137161 }
138- false
139- } else {
140- true
141162 }
142- } else {
143- true
144- }
145- } else if (scriptOutputWritable == null ) {
146- scriptOutputWritable = reusedWritableObject
163+ } else if (scriptOutputWritable == null ) {
164+ scriptOutputWritable = reusedWritableObject
147165
148- if (scriptOutputReader != null ) {
149- if (scriptOutputReader.next(scriptOutputWritable) <= 0 ) {
150- writerThread.exception.foreach(throw _)
151- false
166+ if (scriptOutputReader != null ) {
167+ if (scriptOutputReader.next(scriptOutputWritable) <= 0 ) {
168+ checkFailureAndPropagate()
169+ return false
170+ }
152171 } else {
153- true
154- }
155- } else {
156- try {
157- scriptOutputWritable.readFields(scriptOutputStream)
158- true
159- } catch {
160- case _ : EOFException =>
161- if (writerThread.exception.isDefined) {
162- throw writerThread.exception.get
163- }
164- false
172+ try {
173+ scriptOutputWritable.readFields(scriptOutputStream)
174+ } catch {
175+ case _ : EOFException =>
176+ // This means that the stdout of `proc` (ie. TRANSFORM process) has exhausted.
177+ // Ideally the proc should *not* be alive at this point but
178+ // there can be a lag between EOF being written out and the process
179+ // being terminated. So explicitly waiting for the process to be done.
180+ proc.waitFor()
181+ checkFailureAndPropagate()
182+ return false
183+ }
165184 }
166185 }
167- } else {
186+
168187 true
188+ } catch {
189+ case NonFatal (e) =>
190+ // If this exception is due to abrupt / unclean termination of `proc`,
191+ // then detect it and propagate a better exception message for end users
192+ checkFailureAndPropagate(e)
193+
194+ throw e
169195 }
170196 }
171197
@@ -284,7 +310,6 @@ private class ScriptTransformationWriterThread(
284310 }
285311 }
286312 }
287- outputStream.close()
288313 threwException = false
289314 } catch {
290315 case NonFatal (e) =>
@@ -295,6 +320,7 @@ private class ScriptTransformationWriterThread(
295320 throw e
296321 } finally {
297322 try {
323+ outputStream.close()
298324 if (proc.waitFor() != 0 ) {
299325 logError(stderrBuffer.toString) // log the stderr circular buffer
300326 }
0 commit comments